Repository: beam
Updated Branches:
  refs/heads/master 5fe78440b -> f03f6ac19


[BEAM-1205] Auto set "enableAbandonedNodeEnforcement" in TestPipeline


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50daea28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50daea28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50daea28

Branch: refs/heads/master
Commit: 50daea288b9c5df2b481e5e6bea153796c03830a
Parents: 5fe7844
Author: Stas Levin <stasle...@gmail.com>
Authored: Thu Dec 22 19:13:01 2016 +0200
Committer: Stas Levin <stasle...@apache.org>
Committed: Thu Feb 16 11:18:09 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/sdk/testing/Annotations.java    |  72 +++
 .../apache/beam/sdk/testing/TestPipeline.java   |  66 ++-
 .../apache/beam/sdk/metrics/MetricsTest.java    |   6 +-
 .../beam/sdk/testing/TestPipelineTest.java      | 504 +++++++++++--------
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java |   9 +-
 5 files changed, 417 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java
new file mode 100644
index 0000000..e560226
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.beam.sdk.testing;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.lang.annotation.Annotation;
+import java.util.Arrays;
+import javax.annotation.Nonnull;
+import org.junit.experimental.categories.Category;
+
+/**
+ * A utility class for querying annotations.
+ */
+class Annotations {
+
+  /**
+   * Annotation predicates.
+   */
+  static class Predicates {
+
+    static Predicate<Annotation> isAnnotationOfType(final Class<? extends 
Annotation> clazz) {
+      return new Predicate<Annotation>() {
+
+        @Override
+        public boolean apply(@Nonnull final Annotation annotation) {
+          return annotation.annotationType() != null
+              && annotation.annotationType().equals(clazz);
+        }
+      };
+    }
+
+    static Predicate<Annotation> isCategoryOf(final Class<?> value, final 
boolean allowDerived) {
+      return new Predicate<Annotation>() {
+
+        @Override
+        public boolean apply(@Nonnull final Annotation category) {
+          return
+              FluentIterable
+                  .from(Arrays.asList(((Category) category).value()))
+                  .anyMatch(new Predicate<Class<?>>() {
+
+                    @Override
+                    public boolean apply(final Class<?> aClass) {
+                      return
+                          allowDerived
+                              ? value.isAssignableFrom(aClass)
+                              : value.equals(aClass);
+                    }
+                  });
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 37c809a..02eefa9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -93,15 +95,18 @@ public class TestPipeline extends Pipeline implements 
TestRule {
 
   private static class PipelineRunEnforcement {
 
+    @SuppressWarnings("WeakerAccess")
     protected boolean enableAutoRunIfMissing;
+
     protected final Pipeline pipeline;
+
     private boolean runInvoked;
 
     private PipelineRunEnforcement(final Pipeline pipeline) {
       this.pipeline = pipeline;
     }
 
-    private void enableAutoRunIfMissing(final boolean enable) {
+    protected void enableAutoRunIfMissing(final boolean enable) {
       enableAutoRunIfMissing = enable;
     }
 
@@ -156,6 +161,12 @@ public class TestPipeline extends Pipeline implements 
TestRule {
       return nodeRecorder.visited;
     }
 
+    private boolean isEmptyPipeline(final Pipeline pipeline) {
+      final IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor();
+      pipeline.traverseTopologically(isEmptyVisitor);
+      return isEmptyVisitor.isEmpty();
+    }
+
     private void verifyPipelineExecution() {
       final List<TransformHierarchy.Node> pipelineNodes = 
recordPipelineNodes(pipeline);
       if (runVisitedNodes != null && !runVisitedNodes.equals(pipelineNodes)) {
@@ -169,11 +180,11 @@ public class TestPipeline extends Pipeline implements 
TestRule {
           throw new AbandonedNodeException("The pipeline contains abandoned 
PTransform(s).");
         }
       } else if (runVisitedNodes == null && !enableAutoRunIfMissing) {
-        IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor();
-        pipeline.traverseTopologically(isEmptyVisitor);
-
-        if (!isEmptyVisitor.isEmpty()) {
-          throw new PipelineRunMissingException("The pipeline has not been 
run.");
+        if (!isEmptyPipeline(pipeline)) {
+          throw new PipelineRunMissingException(
+              "The pipeline has not been run (runner: "
+                  + pipeline.getOptions().getRunner().getSimpleName()
+                  + ")");
         }
       }
     }
@@ -214,7 +225,8 @@ public class TestPipeline extends Pipeline implements 
TestRule {
   static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
-  private PipelineRunEnforcement enforcement = new 
PipelineAbandonedNodeEnforcement(this);
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+  private Optional<? extends PipelineRunEnforcement> enforcement = 
Optional.absent();
 
   /**
    * Creates and returns a new test pipeline.
@@ -239,10 +251,35 @@ public class TestPipeline extends Pipeline implements 
TestRule {
   public Statement apply(final Statement statement, final Description 
description) {
     return new Statement() {
 
+      private void setDeducedEnforcementLevel() {
+        // if the enforcement level has not been set by the user do 
auto-inference
+        if (!enforcement.isPresent()) {
+
+          final boolean annotatedWithNeedsRunner =
+              FluentIterable.from(description.getAnnotations())
+                  
.filter(Annotations.Predicates.isAnnotationOfType(Category.class))
+                  
.anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true));
+
+          final boolean crashingRunner =
+              CrashingRunner.class.isAssignableFrom(getOptions().getRunner());
+
+          checkState(
+              !(annotatedWithNeedsRunner && crashingRunner),
+              "The test was annotated with a [@%s] / [@%s] while the runner "
+                  + "was set to [%s]. Please re-check your configuration.",
+              NeedsRunner.class.getSimpleName(),
+              RunnableOnService.class.getSimpleName(),
+              CrashingRunner.class.getSimpleName());
+
+          enableAbandonedNodeEnforcement(annotatedWithNeedsRunner || 
!crashingRunner);
+        }
+      }
+
       @Override
       public void evaluate() throws Throwable {
+        setDeducedEnforcementLevel();
         statement.evaluate();
-        enforcement.afterTestCompletion();
+        enforcement.get().afterTestCompletion();
       }
     };
   }
@@ -253,6 +290,11 @@ public class TestPipeline extends Pipeline implements 
TestRule {
    */
   @Override
   public PipelineResult run() {
+    checkState(
+        enforcement.isPresent(),
+        "Attempted to run a pipeline while it's enforcement level was not set. 
Are you "
+            + "using TestPipeline without a @Rule annotation?");
+
     try {
       return super.run();
     } catch (RuntimeException exc) {
@@ -263,19 +305,21 @@ public class TestPipeline extends Pipeline implements 
TestRule {
         throw exc;
       }
     } finally {
-      enforcement.afterPipelineExecution();
+      enforcement.get().afterPipelineExecution();
     }
   }
 
   public TestPipeline enableAbandonedNodeEnforcement(final boolean enable) {
     enforcement =
-        enable ? new PipelineAbandonedNodeEnforcement(this) : new 
PipelineRunEnforcement(this);
+        enable
+            ? Optional.of(new PipelineAbandonedNodeEnforcement(this))
+            : Optional.of(new PipelineRunEnforcement(this));
 
     return this;
   }
 
   public TestPipeline enableAutoRunIfMissing(final boolean enable) {
-    enforcement.enableAutoRunIfMissing(enable);
+    enforcement.get().enableAutoRunIfMissing(enable);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index dd75e58..fc9e18b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -41,6 +40,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.CoreMatchers;
 import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -53,6 +53,9 @@ public class MetricsTest implements Serializable {
   private static final String NAME = "name";
   private static final MetricName METRIC_NAME = MetricName.named(NS, NAME);
 
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   @After
   public void tearDown() {
     MetricsEnvironment.setCurrentContainer(null);
@@ -168,7 +171,6 @@ public class MetricsTest implements Serializable {
 
   private PipelineResult runPipelineWithMetrics() {
     final Counter count = Metrics.counter(MetricsTest.class, "count");
-    Pipeline pipeline = TestPipeline.create();
     final TupleTag<Integer> output1 = new TupleTag<Integer>(){};
     final TupleTag<Integer> output2 = new TupleTag<Integer>(){};
     pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index f484566..1a7d375 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.beam.sdk.testing;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -24,308 +25,367 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.UUID;
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
-import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.RuleChain;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.Suite;
 
 /** Tests for {@link TestPipeline}. */
-@RunWith(JUnit4.class)
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+  TestPipelineTest.TestPipelineCreationTest.class,
+  TestPipelineTest.TestPipelineEnforcementsTest.WithRealPipelineRunner.class,
+  
TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class
+})
 public class TestPipelineTest implements Serializable {
-  private static final List<String> WORDS = Collections.singletonList("hi 
there");
-  private static final String DUMMY = "expected";
-
-  private final transient TestPipeline pipeline =
-      
TestPipeline.fromOptions(pipelineOptions()).enableAbandonedNodeEnforcement(true);
 
-  private final transient ExpectedException exception = 
ExpectedException.none();
-
-  @Rule public transient TestRule restoreSystemProperties = new 
RestoreSystemProperties();
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-  @Rule public transient RuleChain ruleOrder = 
RuleChain.outerRule(exception).around(pipeline);
+  /** Tests related to the creation of a {@link TestPipeline}. */
+  @RunWith(JUnit4.class)
+  public static class TestPipelineCreationTest {
+    @Rule public transient TestRule restoreSystemProperties = new 
RestoreSystemProperties();
+    @Rule public transient ExpectedException thrown = ExpectedException.none();
+    @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+    @Test
+    public void testCreationUsingDefaults() {
+      assertNotNull(pipeline);
+      assertNotNull(TestPipeline.create());
+    }
 
-  @Test
-  public void testNoTestPipelineUsed() { }
+    @Test
+    public void testCreationNotAsTestRule() {
+      thrown.expect(IllegalStateException.class);
+      thrown.expectMessage("@Rule");
 
-  @Test
-  public void testCreationUsingDefaults() {
-    assertNotNull(TestPipeline.create());
-  }
+      TestPipeline.create().run();
+    }
 
-  @Test
-  public void testCreationOfPipelineOptions() throws Exception {
-    ObjectMapper mapper = new ObjectMapper();
-    String stringOptions =
-        mapper.writeValueAsString(
-            new String[] {
-              "--runner=org.apache.beam.sdk.testing.CrashingRunner", 
"--project=testProject"
-            });
-    System.getProperties().put("beamTestPipelineOptions", stringOptions);
-    GcpOptions options = 
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
-    assertEquals(CrashingRunner.class, options.getRunner());
-    assertEquals(options.getProject(), "testProject");
-  }
+    @Test
+    public void testCreationOfPipelineOptions() throws Exception {
+      ObjectMapper mapper = new ObjectMapper();
+      String stringOptions =
+          mapper.writeValueAsString(
+              new String[] {
+                "--runner=org.apache.beam.sdk.testing.CrashingRunner", 
"--project=testProject"
+              });
+      System.getProperties().put("beamTestPipelineOptions", stringOptions);
+      GcpOptions options = 
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
+      assertEquals(CrashingRunner.class, options.getRunner());
+      assertEquals(options.getProject(), "testProject");
+    }
 
-  @Test
-  public void testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase() 
throws Exception {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    assertThat(
-        options.as(ApplicationNameOptions.class).getAppName(),
-        startsWith(
-            
"TestPipelineTest-testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase"));
-  }
+    @Test
+    public void 
testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase() throws 
Exception {
+      PipelineOptions options = TestPipeline.testingPipelineOptions();
+      assertThat(
+          options.as(ApplicationNameOptions.class).getAppName(),
+          startsWith(
+              "TestPipelineTest$TestPipelineCreationTest"
+                  + 
"-testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase"));
+    }
 
-  @Test
-  public void testToString() {
-    assertEquals("TestPipeline#TestPipelineTest-testToString", 
TestPipeline.create().toString());
-  }
+    @Test
+    public void testToString() {
+      assertEquals(
+          
"TestPipeline#TestPipelineTest$TestPipelineCreationTest-testToString",
+          TestPipeline.create().toString());
+    }
 
-  @Test
-  public void testToStringNestedMethod() {
-    TestPipeline p = nestedMethod();
+    @Test
+    public void testToStringNestedMethod() {
+      TestPipeline p = nestedMethod();
 
-    assertEquals("TestPipeline#TestPipelineTest-testToStringNestedMethod", 
p.toString());
-    assertEquals(
-        "TestPipelineTest-testToStringNestedMethod",
-        p.getOptions().as(ApplicationNameOptions.class).getAppName());
-  }
+      assertEquals(
+          
"TestPipeline#TestPipelineTest$TestPipelineCreationTest-testToStringNestedMethod",
+          p.toString());
+      assertEquals(
+          "TestPipelineTest$TestPipelineCreationTest-testToStringNestedMethod",
+          p.getOptions().as(ApplicationNameOptions.class).getAppName());
+    }
 
-  private TestPipeline nestedMethod() {
-    return TestPipeline.create();
-  }
+    private TestPipeline nestedMethod() {
+      return TestPipeline.create();
+    }
 
-  @Test
-  public void testConvertToArgs() {
-    String[] args = new String[] {"--tempLocation=Test_Location"};
-    PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
-    String[] arr = TestPipeline.convertToArgs(options);
-    List<String> lst = Arrays.asList(arr);
-    assertEquals(lst.size(), 2);
-    assertThat(
-        lst, containsInAnyOrder("--tempLocation=Test_Location", 
"--appName=TestPipelineTest"));
-  }
+    @Test
+    public void testConvertToArgs() {
+      String[] args = new String[] {"--tempLocation=Test_Location"};
+      PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+      String[] arr = TestPipeline.convertToArgs(options);
+      List<String> lst = Arrays.asList(arr);
+      assertEquals(lst.size(), 2);
+      assertThat(
+          lst,
+          containsInAnyOrder("--tempLocation=Test_Location", 
"--appName=TestPipelineCreationTest"));
+    }
 
-  @Test
-  public void testToStringNestedClassMethod() {
-    TestPipeline p = new NestedTester().p();
+    @Test
+    public void testToStringNestedClassMethod() {
+      TestPipeline p = new NestedTester().p();
 
-    
assertEquals("TestPipeline#TestPipelineTest-testToStringNestedClassMethod", 
p.toString());
-    assertEquals(
-        "TestPipelineTest-testToStringNestedClassMethod",
-        p.getOptions().as(ApplicationNameOptions.class).getAppName());
-  }
+      assertEquals(
+          
"TestPipeline#TestPipelineTest$TestPipelineCreationTest-testToStringNestedClassMethod",
+          p.toString());
+      assertEquals(
+          
"TestPipelineTest$TestPipelineCreationTest-testToStringNestedClassMethod",
+          p.getOptions().as(ApplicationNameOptions.class).getAppName());
+    }
 
-  private static class NestedTester {
-    public TestPipeline p() {
-      return TestPipeline.create();
+    private static class NestedTester {
+      public TestPipeline p() {
+        return TestPipeline.create();
+      }
     }
-  }
 
-  @Test
-  public void testMatcherSerializationDeserialization() {
-    TestPipelineOptions opts = 
PipelineOptionsFactory.as(TestPipelineOptions.class);
-    SerializableMatcher<PipelineResult> m1 = new TestMatcher();
-    SerializableMatcher<PipelineResult> m2 = new TestMatcher();
+    @Test
+    public void testMatcherSerializationDeserialization() {
+      TestPipelineOptions opts = 
PipelineOptionsFactory.as(TestPipelineOptions.class);
+      SerializableMatcher<PipelineResult> m1 = new TestMatcher();
+      SerializableMatcher<PipelineResult> m2 = new TestMatcher();
 
-    opts.setOnCreateMatcher(m1);
-    opts.setOnSuccessMatcher(m2);
+      opts.setOnCreateMatcher(m1);
+      opts.setOnSuccessMatcher(m2);
 
-    String[] arr = TestPipeline.convertToArgs(opts);
-    TestPipelineOptions newOpts =
-        PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class);
+      String[] arr = TestPipeline.convertToArgs(opts);
+      TestPipelineOptions newOpts =
+          PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class);
 
-    assertEquals(m1, newOpts.getOnCreateMatcher());
-    assertEquals(m2, newOpts.getOnSuccessMatcher());
-  }
+      assertEquals(m1, newOpts.getOnCreateMatcher());
+      assertEquals(m2, newOpts.getOnSuccessMatcher());
+    }
 
-  @Test
-  public void testRunWithDummyEnvironmentVariableFails() {
-    System.getProperties()
-        .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, 
Boolean.toString(true));
-    TestPipeline pipeline = TestPipeline.create();
-    pipeline.apply(Create.of(1, 2, 3));
+    @Test
+    public void testRunWithDummyEnvironmentVariableFails() {
+      System.getProperties()
+          .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, 
Boolean.toString(true));
+      pipeline.apply(Create.of(1, 2, 3));
 
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Cannot call #run");
-    pipeline.run();
-  }
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage("Cannot call #run");
+      pipeline.run();
+    }
 
-  /** TestMatcher is a matcher designed for testing matcher 
serialization/deserialization. */
-  public static class TestMatcher extends BaseMatcher<PipelineResult>
-      implements SerializableMatcher<PipelineResult> {
-    private final UUID uuid = UUID.randomUUID();
+    /** TestMatcher is a matcher designed for testing matcher 
serialization/deserialization. */
+    public static class TestMatcher extends BaseMatcher<PipelineResult>
+        implements SerializableMatcher<PipelineResult> {
+      private final UUID uuid = UUID.randomUUID();
 
-    @Override
-    public boolean matches(Object o) {
-      return true;
-    }
+      @Override
+      public boolean matches(Object o) {
+        return true;
+      }
 
-    @Override
-    public void describeTo(Description description) {
-      description.appendText(String.format("%tL", new Date()));
-    }
+      @Override
+      public void describeTo(Description description) {
+        description.appendText(String.format("%tL", new Date()));
+      }
 
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof TestMatcher)) {
-        return false;
+      @Override
+      public boolean equals(Object obj) {
+        if (!(obj instanceof TestMatcher)) {
+          return false;
+        }
+        TestMatcher other = (TestMatcher) obj;
+        return other.uuid.equals(uuid);
       }
-      TestMatcher other = (TestMatcher) obj;
-      return other.uuid.equals(uuid);
-    }
 
-    @Override
-    public int hashCode() {
-      return uuid.hashCode();
+      @Override
+      public int hashCode() {
+        return uuid.hashCode();
+      }
     }
   }
 
-  private static class DummyRunner extends PipelineRunner<PipelineResult> {
+  /**
+   * Tests for {@link TestPipeline}'s detection of missing {@link 
Pipeline#run()}, or abandoned
+   * (dangling) {@link PAssert} or {@link 
org.apache.beam.sdk.transforms.PTransform} nodes.
+   */
+  public static class TestPipelineEnforcementsTest implements Serializable {
+
+    private static final List<String> WORDS = Collections.singletonList("hi 
there");
+    private static final String WHATEVER = "expected";
+    private static final String P_TRANSFORM = "PTransform";
+    private static final String P_ASSERT = "PAssert";
+
+    @SuppressWarnings("UnusedReturnValue")
+    private static PCollection<String> addTransform(final PCollection<String> 
pCollection) {
+      return pCollection.apply(
+          "Map2",
+          MapElements.via(
+              new SimpleFunction<String, String>() {
+
+                @Override
+                public String apply(final String input) {
+                  return WHATEVER;
+                }
+              }));
+    }
 
-    @SuppressWarnings("unused") // used by reflection
-    public static DummyRunner fromOptions(final PipelineOptions opts) {
-      return new DummyRunner();
+    private static PCollection<String> pCollection(final Pipeline pipeline) {
+      return pipeline
+          .apply("Create", Create.of(WORDS).withCoder(StringUtf8Coder.of()))
+          .apply(
+              "Map1",
+              MapElements.via(
+                  new SimpleFunction<String, String>() {
+
+                    @Override
+                    public String apply(final String input) {
+                      return WHATEVER;
+                    }
+                  }));
     }
 
-    @Override
-    public PipelineResult run(final Pipeline pipeline) {
-      return new PipelineResult() {
+    /** Tests for {@link TestPipeline}s with a non {@link CrashingRunner}. */
+    @RunWith(JUnit4.class)
+    public static class WithRealPipelineRunner {
 
-        @Override
-        public State getState() {
-          return null;
-        }
+      private final transient ExpectedException exception = 
ExpectedException.none();
 
-        @Override
-        public State cancel() throws IOException {
-          return null;
-        }
+      private final transient TestPipeline pipeline = TestPipeline.create();
 
-        @Override
-        public State waitUntilFinish(final Duration duration) {
-          return null;
-        }
+      @Rule
+      public final transient RuleChain chain = 
RuleChain.outerRule(exception).around(pipeline);
 
-        @Override
-        public State waitUntilFinish() {
-          return null;
-        }
+      @Category(RunnableOnService.class)
+      @Test
+      public void testNormalFlow() throws Exception {
+        addTransform(pCollection(pipeline));
+        pipeline.run();
+      }
 
-        @Override
-        public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, 
T> aggregator)
-            throws AggregatorRetrievalException {
-          return null;
-        }
+      @Category(RunnableOnService.class)
+      @Test
+      public void testMissingRun() throws Exception {
+        exception.expect(TestPipeline.PipelineRunMissingException.class);
+        addTransform(pCollection(pipeline));
+      }
 
-        @Override
-        public MetricResults metrics() {
-          return null;
-        }
-      };
-    }
-  }
+      @Category(RunnableOnService.class)
+      @Test
+      public void testMissingRunWithDisabledEnforcement() throws Exception {
+        pipeline.enableAbandonedNodeEnforcement(false);
+        addTransform(pCollection(pipeline));
 
-  private static PipelineOptions pipelineOptions() {
-    final PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
-    pipelineOptions.setRunner(DummyRunner.class);
-    return pipelineOptions;
-  }
+        // disable abandoned node detection
+      }
 
-  private PCollection<String> pCollection() {
-    return 
addTransform(pipeline.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())));
-  }
+      @Category(RunnableOnService.class)
+      @Test
+      public void testMissingRunAutoAdd() throws Exception {
+        pipeline.enableAutoRunIfMissing(true);
+        addTransform(pCollection(pipeline));
 
-  private PCollection<String> addTransform(final PCollection<String> 
pCollection) {
-    return pCollection.apply(
-        MapElements.via(
-            new SimpleFunction<String, String>() {
+        // have the pipeline.run() auto-added
+      }
 
-              @Override
-              public String apply(final String input) {
-                return DUMMY;
-              }
-            }));
-  }
+      @Category(RunnableOnService.class)
+      @Test
+      public void testDanglingPTransformRunnableOnService() throws Exception {
+        final PCollection<String> pCollection = pCollection(pipeline);
+        PAssert.that(pCollection).containsInAnyOrder(WHATEVER);
+        pipeline.run().waitUntilFinish();
+
+        exception.expect(TestPipeline.AbandonedNodeException.class);
+        exception.expectMessage(P_TRANSFORM);
+        // dangling PTransform
+        addTransform(pCollection);
+      }
 
-  @Test
-  public void testPipelineRunMissing() throws Throwable {
-    exception.expect(TestPipeline.PipelineRunMissingException.class);
-    PAssert.that(pCollection()).containsInAnyOrder(DUMMY);
-    // missing pipeline#run
-  }
+      @Category(NeedsRunner.class)
+      @Test
+      public void testDanglingPTransformNeedsRunner() throws Exception {
+        final PCollection<String> pCollection = pCollection(pipeline);
+        PAssert.that(pCollection).containsInAnyOrder(WHATEVER);
+        pipeline.run().waitUntilFinish();
+
+        exception.expect(TestPipeline.AbandonedNodeException.class);
+        exception.expectMessage(P_TRANSFORM);
+        // dangling PTransform
+        addTransform(pCollection);
+      }
+
+      @Category(RunnableOnService.class)
+      @Test
+      public void testDanglingPAssertRunnableOnService() throws Exception {
+        final PCollection<String> pCollection = pCollection(pipeline);
+        PAssert.that(pCollection).containsInAnyOrder(WHATEVER);
+        pipeline.run().waitUntilFinish();
+
+        exception.expect(TestPipeline.AbandonedNodeException.class);
+        exception.expectMessage(P_ASSERT);
+        // dangling PAssert
+        PAssert.that(pCollection).containsInAnyOrder(WHATEVER);
+      }
 
-  @Test
-  public void testPipelineHasAbandonedPAssertNode() throws Throwable {
-    exception.expect(TestPipeline.AbandonedNodeException.class);
-    exception.expectMessage("PAssert");
+      /**
+       * Tests that a {@link TestPipeline} rule behaves as expected when there 
is no pipeline usage
+       * within a test that has a {@link RunnableOnService} annotation.
+       */
+      @Category(RunnableOnService.class)
+      @Test
+      public void testNoTestPipelineUsedRunnableOnService() {}
+
+      /**
+       * Tests that a {@link TestPipeline} rule behaves as expected when there 
is no pipeline usage
+       * present in a test.
+       */
+      @Test
+      public void testNoTestPipelineUsedNoAnnotation() {}
+    }
 
-    final PCollection<String> pCollection = pCollection();
-    PAssert.that(pCollection).containsInAnyOrder(DUMMY);
-    pipeline.run().waitUntilFinish();
+    /** Tests for {@link TestPipeline}s with a {@link CrashingRunner}. */
+    @RunWith(JUnit4.class)
+    public static class WithCrashingPipelineRunner {
 
-    // dangling PAssert
-    PAssert.that(pCollection).containsInAnyOrder(DUMMY);
-  }
+      static {
+        System.setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, 
Boolean.TRUE.toString());
+      }
 
-  @Test
-  public void testPipelineHasAbandonedPTransformNode() throws Throwable {
-    exception.expect(TestPipeline.AbandonedNodeException.class);
-    exception.expectMessage("PTransform");
+      private final transient ExpectedException exception = 
ExpectedException.none();
 
-    final PCollection<String> pCollection = pCollection();
-    PAssert.that(pCollection).containsInAnyOrder(DUMMY);
-    pipeline.run().waitUntilFinish();
+      private final transient TestPipeline pipeline = TestPipeline.create();
 
-    // dangling PTransform
-    addTransform(pCollection);
-  }
+      @Rule
+      public final transient RuleChain chain = 
RuleChain.outerRule(exception).around(pipeline);
 
-  @Test
-  public void testNormalFlowWithPAssert() throws Throwable {
-    PAssert.that(pCollection()).containsInAnyOrder(DUMMY);
-    pipeline.run().waitUntilFinish();
-  }
+      @Test
+      public void testNoTestPipelineUsed() {}
 
-  @Test
-  public void testAutoAddMissingRunFlow() throws Throwable {
-    PAssert.that(pCollection()).containsInAnyOrder(DUMMY);
-    // missing pipeline#run, but have it auto-added.
-    pipeline.enableAutoRunIfMissing(true);
-  }
+      @Test
+      public void testMissingRun() throws Exception {
+        addTransform(pCollection(pipeline));
 
-  @Test
-  public void testDisableStrictPAssertFlow() throws Throwable {
-    pCollection();
-    // dangling PTransform, but ignore it
-    pipeline.enableAbandonedNodeEnforcement(false);
+        // pipeline.run() is missing, BUT:
+        // 1. Neither @RunnableOnService nor @NeedsRunner are present, AND
+        // 2. The runner class is CrashingRunner.class
+        // (1) + (2) => we assume this pipeline was never meant to be run, so 
no exception is
+        // thrown on account of the missing run / dangling nodes.
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index be8fbc7..8a82f40 100644
--- 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -27,7 +27,6 @@ import java.util.Set;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -41,6 +40,7 @@ import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -57,6 +57,9 @@ public class MqttIOTest {
 
   private static int port;
 
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   @Before
   public void startBroker() throws Exception {
     LOG.info("Finding free network port");
@@ -77,8 +80,6 @@ public class MqttIOTest {
   @Test(timeout = 60 * 1000)
   @Category(RunnableOnService.class)
   public void testRead() throws Exception {
-    final Pipeline pipeline = TestPipeline.create();
-
     PCollection<byte[]> output = pipeline.apply(
         MqttIO.read()
             .withConnectionConfiguration(
@@ -162,8 +163,6 @@ public class MqttIOTest {
     };
     subscriber.start();
 
-    Pipeline pipeline = TestPipeline.create();
-
     ArrayList<byte[]> data = new ArrayList<>();
     for (int i = 0; i < 200; i++) {
       data.add(("Test " + i).getBytes());

Reply via email to