Repository: beam Updated Branches: refs/heads/master 5fe78440b -> f03f6ac19
[BEAM-1205] Auto set "enableAbandonedNodeEnforcement" in TestPipeline Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50daea28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50daea28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50daea28 Branch: refs/heads/master Commit: 50daea288b9c5df2b481e5e6bea153796c03830a Parents: 5fe7844 Author: Stas Levin <stasle...@gmail.com> Authored: Thu Dec 22 19:13:01 2016 +0200 Committer: Stas Levin <stasle...@apache.org> Committed: Thu Feb 16 11:18:09 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/sdk/testing/Annotations.java | 72 +++ .../apache/beam/sdk/testing/TestPipeline.java | 66 ++- .../apache/beam/sdk/metrics/MetricsTest.java | 6 +- .../beam/sdk/testing/TestPipelineTest.java | 504 +++++++++++-------- .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 9 +- 5 files changed, 417 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java new file mode 100644 index 0000000..e560226 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.beam.sdk.testing; + +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import java.lang.annotation.Annotation; +import java.util.Arrays; +import javax.annotation.Nonnull; +import org.junit.experimental.categories.Category; + +/** + * A utility class for querying annotations. + */ +class Annotations { + + /** + * Annotation predicates. + */ + static class Predicates { + + static Predicate<Annotation> isAnnotationOfType(final Class<? extends Annotation> clazz) { + return new Predicate<Annotation>() { + + @Override + public boolean apply(@Nonnull final Annotation annotation) { + return annotation.annotationType() != null + && annotation.annotationType().equals(clazz); + } + }; + } + + static Predicate<Annotation> isCategoryOf(final Class<?> value, final boolean allowDerived) { + return new Predicate<Annotation>() { + + @Override + public boolean apply(@Nonnull final Annotation category) { + return + FluentIterable + .from(Arrays.asList(((Category) category).value())) + .anyMatch(new Predicate<Class<?>>() { + + @Override + public boolean apply(final Class<?> aClass) { + return + allowDerived + ? value.isAssignableFrom(aClass) + : value.equals(aClass); + } + }); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 37c809a..02eefa9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.testing; +import static com.google.common.base.Preconditions.checkState; + import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.TreeNode; import com.fasterxml.jackson.databind.JsonNode; @@ -93,15 +95,18 @@ public class TestPipeline extends Pipeline implements TestRule { private static class PipelineRunEnforcement { + @SuppressWarnings("WeakerAccess") protected boolean enableAutoRunIfMissing; + protected final Pipeline pipeline; + private boolean runInvoked; private PipelineRunEnforcement(final Pipeline pipeline) { this.pipeline = pipeline; } - private void enableAutoRunIfMissing(final boolean enable) { + protected void enableAutoRunIfMissing(final boolean enable) { enableAutoRunIfMissing = enable; } @@ -156,6 +161,12 @@ public class TestPipeline extends Pipeline implements TestRule { return nodeRecorder.visited; } + private boolean isEmptyPipeline(final Pipeline pipeline) { + final IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor(); + pipeline.traverseTopologically(isEmptyVisitor); + return isEmptyVisitor.isEmpty(); + } + private void verifyPipelineExecution() { final List<TransformHierarchy.Node> pipelineNodes = recordPipelineNodes(pipeline); if (runVisitedNodes != null && !runVisitedNodes.equals(pipelineNodes)) { @@ -169,11 +180,11 @@ public class TestPipeline extends Pipeline implements TestRule { throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s)."); } } else if (runVisitedNodes == null && !enableAutoRunIfMissing) { - IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor(); - pipeline.traverseTopologically(isEmptyVisitor); - - if (!isEmptyVisitor.isEmpty()) { - throw new PipelineRunMissingException("The pipeline has not been run."); + if (!isEmptyPipeline(pipeline)) { + throw new PipelineRunMissingException( + "The pipeline has not been run (runner: " + + pipeline.getOptions().getRunner().getSimpleName() + + ")"); } } } @@ -214,7 +225,8 @@ public class TestPipeline extends Pipeline implements TestRule { static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner"; private static final ObjectMapper MAPPER = new ObjectMapper(); - private PipelineRunEnforcement enforcement = new PipelineAbandonedNodeEnforcement(this); + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private Optional<? extends PipelineRunEnforcement> enforcement = Optional.absent(); /** * Creates and returns a new test pipeline. @@ -239,10 +251,35 @@ public class TestPipeline extends Pipeline implements TestRule { public Statement apply(final Statement statement, final Description description) { return new Statement() { + private void setDeducedEnforcementLevel() { + // if the enforcement level has not been set by the user do auto-inference + if (!enforcement.isPresent()) { + + final boolean annotatedWithNeedsRunner = + FluentIterable.from(description.getAnnotations()) + .filter(Annotations.Predicates.isAnnotationOfType(Category.class)) + .anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true)); + + final boolean crashingRunner = + CrashingRunner.class.isAssignableFrom(getOptions().getRunner()); + + checkState( + !(annotatedWithNeedsRunner && crashingRunner), + "The test was annotated with a [@%s] / [@%s] while the runner " + + "was set to [%s]. Please re-check your configuration.", + NeedsRunner.class.getSimpleName(), + RunnableOnService.class.getSimpleName(), + CrashingRunner.class.getSimpleName()); + + enableAbandonedNodeEnforcement(annotatedWithNeedsRunner || !crashingRunner); + } + } + @Override public void evaluate() throws Throwable { + setDeducedEnforcementLevel(); statement.evaluate(); - enforcement.afterTestCompletion(); + enforcement.get().afterTestCompletion(); } }; } @@ -253,6 +290,11 @@ public class TestPipeline extends Pipeline implements TestRule { */ @Override public PipelineResult run() { + checkState( + enforcement.isPresent(), + "Attempted to run a pipeline while it's enforcement level was not set. Are you " + + "using TestPipeline without a @Rule annotation?"); + try { return super.run(); } catch (RuntimeException exc) { @@ -263,19 +305,21 @@ public class TestPipeline extends Pipeline implements TestRule { throw exc; } } finally { - enforcement.afterPipelineExecution(); + enforcement.get().afterPipelineExecution(); } } public TestPipeline enableAbandonedNodeEnforcement(final boolean enable) { enforcement = - enable ? new PipelineAbandonedNodeEnforcement(this) : new PipelineRunEnforcement(this); + enable + ? Optional.of(new PipelineAbandonedNodeEnforcement(this)) + : Optional.of(new PipelineRunEnforcement(this)); return this; } public TestPipeline enableAutoRunIfMissing(final boolean enable) { - enforcement.enableAutoRunIfMissing(enable); + enforcement.get().enableAutoRunIfMissing(enable); return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index dd75e58..fc9e18b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -28,7 +28,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -41,6 +40,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.CoreMatchers; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -53,6 +53,9 @@ public class MetricsTest implements Serializable { private static final String NAME = "name"; private static final MetricName METRIC_NAME = MetricName.named(NS, NAME); + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @After public void tearDown() { MetricsEnvironment.setCurrentContainer(null); @@ -168,7 +171,6 @@ public class MetricsTest implements Serializable { private PipelineResult runPipelineWithMetrics() { final Counter count = Metrics.counter(MetricsTest.class, "count"); - Pipeline pipeline = TestPipeline.create(); final TupleTag<Integer> output1 = new TupleTag<Integer>(){}; final TupleTag<Integer> output2 = new TupleTag<Integer>(){}; pipeline http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index f484566..1a7d375 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.beam.sdk.testing; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -24,308 +25,367 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.UUID; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; -import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.rules.RuleChain; import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Suite; /** Tests for {@link TestPipeline}. */ -@RunWith(JUnit4.class) +@RunWith(Suite.class) +@Suite.SuiteClasses({ + TestPipelineTest.TestPipelineCreationTest.class, + TestPipelineTest.TestPipelineEnforcementsTest.WithRealPipelineRunner.class, + TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class +}) public class TestPipelineTest implements Serializable { - private static final List<String> WORDS = Collections.singletonList("hi there"); - private static final String DUMMY = "expected"; - - private final transient TestPipeline pipeline = - TestPipeline.fromOptions(pipelineOptions()).enableAbandonedNodeEnforcement(true); - private final transient ExpectedException exception = ExpectedException.none(); - - @Rule public transient TestRule restoreSystemProperties = new RestoreSystemProperties(); - @Rule public transient ExpectedException thrown = ExpectedException.none(); - @Rule public transient RuleChain ruleOrder = RuleChain.outerRule(exception).around(pipeline); + /** Tests related to the creation of a {@link TestPipeline}. */ + @RunWith(JUnit4.class) + public static class TestPipelineCreationTest { + @Rule public transient TestRule restoreSystemProperties = new RestoreSystemProperties(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testCreationUsingDefaults() { + assertNotNull(pipeline); + assertNotNull(TestPipeline.create()); + } - @Test - public void testNoTestPipelineUsed() { } + @Test + public void testCreationNotAsTestRule() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("@Rule"); - @Test - public void testCreationUsingDefaults() { - assertNotNull(TestPipeline.create()); - } + TestPipeline.create().run(); + } - @Test - public void testCreationOfPipelineOptions() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - String stringOptions = - mapper.writeValueAsString( - new String[] { - "--runner=org.apache.beam.sdk.testing.CrashingRunner", "--project=testProject" - }); - System.getProperties().put("beamTestPipelineOptions", stringOptions); - GcpOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); - assertEquals(CrashingRunner.class, options.getRunner()); - assertEquals(options.getProject(), "testProject"); - } + @Test + public void testCreationOfPipelineOptions() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + String stringOptions = + mapper.writeValueAsString( + new String[] { + "--runner=org.apache.beam.sdk.testing.CrashingRunner", "--project=testProject" + }); + System.getProperties().put("beamTestPipelineOptions", stringOptions); + GcpOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); + assertEquals(CrashingRunner.class, options.getRunner()); + assertEquals(options.getProject(), "testProject"); + } - @Test - public void testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase() throws Exception { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - assertThat( - options.as(ApplicationNameOptions.class).getAppName(), - startsWith( - "TestPipelineTest-testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase")); - } + @Test + public void testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase() throws Exception { + PipelineOptions options = TestPipeline.testingPipelineOptions(); + assertThat( + options.as(ApplicationNameOptions.class).getAppName(), + startsWith( + "TestPipelineTest$TestPipelineCreationTest" + + "-testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase")); + } - @Test - public void testToString() { - assertEquals("TestPipeline#TestPipelineTest-testToString", TestPipeline.create().toString()); - } + @Test + public void testToString() { + assertEquals( + "TestPipeline#TestPipelineTest$TestPipelineCreationTest-testToString", + TestPipeline.create().toString()); + } - @Test - public void testToStringNestedMethod() { - TestPipeline p = nestedMethod(); + @Test + public void testToStringNestedMethod() { + TestPipeline p = nestedMethod(); - assertEquals("TestPipeline#TestPipelineTest-testToStringNestedMethod", p.toString()); - assertEquals( - "TestPipelineTest-testToStringNestedMethod", - p.getOptions().as(ApplicationNameOptions.class).getAppName()); - } + assertEquals( + "TestPipeline#TestPipelineTest$TestPipelineCreationTest-testToStringNestedMethod", + p.toString()); + assertEquals( + "TestPipelineTest$TestPipelineCreationTest-testToStringNestedMethod", + p.getOptions().as(ApplicationNameOptions.class).getAppName()); + } - private TestPipeline nestedMethod() { - return TestPipeline.create(); - } + private TestPipeline nestedMethod() { + return TestPipeline.create(); + } - @Test - public void testConvertToArgs() { - String[] args = new String[] {"--tempLocation=Test_Location"}; - PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); - String[] arr = TestPipeline.convertToArgs(options); - List<String> lst = Arrays.asList(arr); - assertEquals(lst.size(), 2); - assertThat( - lst, containsInAnyOrder("--tempLocation=Test_Location", "--appName=TestPipelineTest")); - } + @Test + public void testConvertToArgs() { + String[] args = new String[] {"--tempLocation=Test_Location"}; + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); + String[] arr = TestPipeline.convertToArgs(options); + List<String> lst = Arrays.asList(arr); + assertEquals(lst.size(), 2); + assertThat( + lst, + containsInAnyOrder("--tempLocation=Test_Location", "--appName=TestPipelineCreationTest")); + } - @Test - public void testToStringNestedClassMethod() { - TestPipeline p = new NestedTester().p(); + @Test + public void testToStringNestedClassMethod() { + TestPipeline p = new NestedTester().p(); - assertEquals("TestPipeline#TestPipelineTest-testToStringNestedClassMethod", p.toString()); - assertEquals( - "TestPipelineTest-testToStringNestedClassMethod", - p.getOptions().as(ApplicationNameOptions.class).getAppName()); - } + assertEquals( + "TestPipeline#TestPipelineTest$TestPipelineCreationTest-testToStringNestedClassMethod", + p.toString()); + assertEquals( + "TestPipelineTest$TestPipelineCreationTest-testToStringNestedClassMethod", + p.getOptions().as(ApplicationNameOptions.class).getAppName()); + } - private static class NestedTester { - public TestPipeline p() { - return TestPipeline.create(); + private static class NestedTester { + public TestPipeline p() { + return TestPipeline.create(); + } } - } - @Test - public void testMatcherSerializationDeserialization() { - TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class); - SerializableMatcher<PipelineResult> m1 = new TestMatcher(); - SerializableMatcher<PipelineResult> m2 = new TestMatcher(); + @Test + public void testMatcherSerializationDeserialization() { + TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class); + SerializableMatcher<PipelineResult> m1 = new TestMatcher(); + SerializableMatcher<PipelineResult> m2 = new TestMatcher(); - opts.setOnCreateMatcher(m1); - opts.setOnSuccessMatcher(m2); + opts.setOnCreateMatcher(m1); + opts.setOnSuccessMatcher(m2); - String[] arr = TestPipeline.convertToArgs(opts); - TestPipelineOptions newOpts = - PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class); + String[] arr = TestPipeline.convertToArgs(opts); + TestPipelineOptions newOpts = + PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class); - assertEquals(m1, newOpts.getOnCreateMatcher()); - assertEquals(m2, newOpts.getOnSuccessMatcher()); - } + assertEquals(m1, newOpts.getOnCreateMatcher()); + assertEquals(m2, newOpts.getOnSuccessMatcher()); + } - @Test - public void testRunWithDummyEnvironmentVariableFails() { - System.getProperties() - .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true)); - TestPipeline pipeline = TestPipeline.create(); - pipeline.apply(Create.of(1, 2, 3)); + @Test + public void testRunWithDummyEnvironmentVariableFails() { + System.getProperties() + .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true)); + pipeline.apply(Create.of(1, 2, 3)); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Cannot call #run"); - pipeline.run(); - } + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot call #run"); + pipeline.run(); + } - /** TestMatcher is a matcher designed for testing matcher serialization/deserialization. */ - public static class TestMatcher extends BaseMatcher<PipelineResult> - implements SerializableMatcher<PipelineResult> { - private final UUID uuid = UUID.randomUUID(); + /** TestMatcher is a matcher designed for testing matcher serialization/deserialization. */ + public static class TestMatcher extends BaseMatcher<PipelineResult> + implements SerializableMatcher<PipelineResult> { + private final UUID uuid = UUID.randomUUID(); - @Override - public boolean matches(Object o) { - return true; - } + @Override + public boolean matches(Object o) { + return true; + } - @Override - public void describeTo(Description description) { - description.appendText(String.format("%tL", new Date())); - } + @Override + public void describeTo(Description description) { + description.appendText(String.format("%tL", new Date())); + } - @Override - public boolean equals(Object obj) { - if (!(obj instanceof TestMatcher)) { - return false; + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TestMatcher)) { + return false; + } + TestMatcher other = (TestMatcher) obj; + return other.uuid.equals(uuid); } - TestMatcher other = (TestMatcher) obj; - return other.uuid.equals(uuid); - } - @Override - public int hashCode() { - return uuid.hashCode(); + @Override + public int hashCode() { + return uuid.hashCode(); + } } } - private static class DummyRunner extends PipelineRunner<PipelineResult> { + /** + * Tests for {@link TestPipeline}'s detection of missing {@link Pipeline#run()}, or abandoned + * (dangling) {@link PAssert} or {@link org.apache.beam.sdk.transforms.PTransform} nodes. + */ + public static class TestPipelineEnforcementsTest implements Serializable { + + private static final List<String> WORDS = Collections.singletonList("hi there"); + private static final String WHATEVER = "expected"; + private static final String P_TRANSFORM = "PTransform"; + private static final String P_ASSERT = "PAssert"; + + @SuppressWarnings("UnusedReturnValue") + private static PCollection<String> addTransform(final PCollection<String> pCollection) { + return pCollection.apply( + "Map2", + MapElements.via( + new SimpleFunction<String, String>() { + + @Override + public String apply(final String input) { + return WHATEVER; + } + })); + } - @SuppressWarnings("unused") // used by reflection - public static DummyRunner fromOptions(final PipelineOptions opts) { - return new DummyRunner(); + private static PCollection<String> pCollection(final Pipeline pipeline) { + return pipeline + .apply("Create", Create.of(WORDS).withCoder(StringUtf8Coder.of())) + .apply( + "Map1", + MapElements.via( + new SimpleFunction<String, String>() { + + @Override + public String apply(final String input) { + return WHATEVER; + } + })); } - @Override - public PipelineResult run(final Pipeline pipeline) { - return new PipelineResult() { + /** Tests for {@link TestPipeline}s with a non {@link CrashingRunner}. */ + @RunWith(JUnit4.class) + public static class WithRealPipelineRunner { - @Override - public State getState() { - return null; - } + private final transient ExpectedException exception = ExpectedException.none(); - @Override - public State cancel() throws IOException { - return null; - } + private final transient TestPipeline pipeline = TestPipeline.create(); - @Override - public State waitUntilFinish(final Duration duration) { - return null; - } + @Rule + public final transient RuleChain chain = RuleChain.outerRule(exception).around(pipeline); - @Override - public State waitUntilFinish() { - return null; - } + @Category(RunnableOnService.class) + @Test + public void testNormalFlow() throws Exception { + addTransform(pCollection(pipeline)); + pipeline.run(); + } - @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - return null; - } + @Category(RunnableOnService.class) + @Test + public void testMissingRun() throws Exception { + exception.expect(TestPipeline.PipelineRunMissingException.class); + addTransform(pCollection(pipeline)); + } - @Override - public MetricResults metrics() { - return null; - } - }; - } - } + @Category(RunnableOnService.class) + @Test + public void testMissingRunWithDisabledEnforcement() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + addTransform(pCollection(pipeline)); - private static PipelineOptions pipelineOptions() { - final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); - pipelineOptions.setRunner(DummyRunner.class); - return pipelineOptions; - } + // disable abandoned node detection + } - private PCollection<String> pCollection() { - return addTransform(pipeline.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()))); - } + @Category(RunnableOnService.class) + @Test + public void testMissingRunAutoAdd() throws Exception { + pipeline.enableAutoRunIfMissing(true); + addTransform(pCollection(pipeline)); - private PCollection<String> addTransform(final PCollection<String> pCollection) { - return pCollection.apply( - MapElements.via( - new SimpleFunction<String, String>() { + // have the pipeline.run() auto-added + } - @Override - public String apply(final String input) { - return DUMMY; - } - })); - } + @Category(RunnableOnService.class) + @Test + public void testDanglingPTransformRunnableOnService() throws Exception { + final PCollection<String> pCollection = pCollection(pipeline); + PAssert.that(pCollection).containsInAnyOrder(WHATEVER); + pipeline.run().waitUntilFinish(); + + exception.expect(TestPipeline.AbandonedNodeException.class); + exception.expectMessage(P_TRANSFORM); + // dangling PTransform + addTransform(pCollection); + } - @Test - public void testPipelineRunMissing() throws Throwable { - exception.expect(TestPipeline.PipelineRunMissingException.class); - PAssert.that(pCollection()).containsInAnyOrder(DUMMY); - // missing pipeline#run - } + @Category(NeedsRunner.class) + @Test + public void testDanglingPTransformNeedsRunner() throws Exception { + final PCollection<String> pCollection = pCollection(pipeline); + PAssert.that(pCollection).containsInAnyOrder(WHATEVER); + pipeline.run().waitUntilFinish(); + + exception.expect(TestPipeline.AbandonedNodeException.class); + exception.expectMessage(P_TRANSFORM); + // dangling PTransform + addTransform(pCollection); + } + + @Category(RunnableOnService.class) + @Test + public void testDanglingPAssertRunnableOnService() throws Exception { + final PCollection<String> pCollection = pCollection(pipeline); + PAssert.that(pCollection).containsInAnyOrder(WHATEVER); + pipeline.run().waitUntilFinish(); + + exception.expect(TestPipeline.AbandonedNodeException.class); + exception.expectMessage(P_ASSERT); + // dangling PAssert + PAssert.that(pCollection).containsInAnyOrder(WHATEVER); + } - @Test - public void testPipelineHasAbandonedPAssertNode() throws Throwable { - exception.expect(TestPipeline.AbandonedNodeException.class); - exception.expectMessage("PAssert"); + /** + * Tests that a {@link TestPipeline} rule behaves as expected when there is no pipeline usage + * within a test that has a {@link RunnableOnService} annotation. + */ + @Category(RunnableOnService.class) + @Test + public void testNoTestPipelineUsedRunnableOnService() {} + + /** + * Tests that a {@link TestPipeline} rule behaves as expected when there is no pipeline usage + * present in a test. + */ + @Test + public void testNoTestPipelineUsedNoAnnotation() {} + } - final PCollection<String> pCollection = pCollection(); - PAssert.that(pCollection).containsInAnyOrder(DUMMY); - pipeline.run().waitUntilFinish(); + /** Tests for {@link TestPipeline}s with a {@link CrashingRunner}. */ + @RunWith(JUnit4.class) + public static class WithCrashingPipelineRunner { - // dangling PAssert - PAssert.that(pCollection).containsInAnyOrder(DUMMY); - } + static { + System.setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.TRUE.toString()); + } - @Test - public void testPipelineHasAbandonedPTransformNode() throws Throwable { - exception.expect(TestPipeline.AbandonedNodeException.class); - exception.expectMessage("PTransform"); + private final transient ExpectedException exception = ExpectedException.none(); - final PCollection<String> pCollection = pCollection(); - PAssert.that(pCollection).containsInAnyOrder(DUMMY); - pipeline.run().waitUntilFinish(); + private final transient TestPipeline pipeline = TestPipeline.create(); - // dangling PTransform - addTransform(pCollection); - } + @Rule + public final transient RuleChain chain = RuleChain.outerRule(exception).around(pipeline); - @Test - public void testNormalFlowWithPAssert() throws Throwable { - PAssert.that(pCollection()).containsInAnyOrder(DUMMY); - pipeline.run().waitUntilFinish(); - } + @Test + public void testNoTestPipelineUsed() {} - @Test - public void testAutoAddMissingRunFlow() throws Throwable { - PAssert.that(pCollection()).containsInAnyOrder(DUMMY); - // missing pipeline#run, but have it auto-added. - pipeline.enableAutoRunIfMissing(true); - } + @Test + public void testMissingRun() throws Exception { + addTransform(pCollection(pipeline)); - @Test - public void testDisableStrictPAssertFlow() throws Throwable { - pCollection(); - // dangling PTransform, but ignore it - pipeline.enableAbandonedNodeEnforcement(false); + // pipeline.run() is missing, BUT: + // 1. Neither @RunnableOnService nor @NeedsRunner are present, AND + // 2. The runner class is CrashingRunner.class + // (1) + (2) => we assume this pipeline was never meant to be run, so no exception is + // thrown on account of the missing run / dangling nodes. + } + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index be8fbc7..8a82f40 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -27,7 +27,6 @@ import java.util.Set; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.Connection; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -41,6 +40,7 @@ import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; @@ -57,6 +57,9 @@ public class MqttIOTest { private static int port; + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @Before public void startBroker() throws Exception { LOG.info("Finding free network port"); @@ -77,8 +80,6 @@ public class MqttIOTest { @Test(timeout = 60 * 1000) @Category(RunnableOnService.class) public void testRead() throws Exception { - final Pipeline pipeline = TestPipeline.create(); - PCollection<byte[]> output = pipeline.apply( MqttIO.read() .withConnectionConfiguration( @@ -162,8 +163,6 @@ public class MqttIOTest { }; subscriber.start(); - Pipeline pipeline = TestPipeline.create(); - ArrayList<byte[]> data = new ArrayList<>(); for (int i = 0; i < 200; i++) { data.add(("Test " + i).getBytes());