Repository: beam Updated Branches: refs/heads/master 48c8ed176 -> b73918b55
[BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d91a97b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d91a97b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d91a97b Branch: refs/heads/master Commit: 8d91a97b77fbda74c577d2cdbd507395834e147c Parents: 0e2bb18 Author: Aviem Zur <aviem...@gmail.com> Authored: Wed May 3 21:06:00 2017 +0300 Committer: Aviem Zur <aviem...@gmail.com> Committed: Thu May 4 20:48:56 2017 +0300 ---------------------------------------------------------------------- runners/spark/pom.xml | 47 +++++++- .../runners/spark/SparkRunnerRegistrar.java | 4 +- .../apache/beam/runners/spark/CacheTest.java | 12 +- .../beam/runners/spark/ForceStreamingTest.java | 18 +-- .../apache/beam/runners/spark/PipelineRule.java | 109 ------------------- .../runners/spark/ProvidedSparkContextTest.java | 10 +- .../runners/spark/SparkRunnerDebuggerTest.java | 15 +-- .../runners/spark/SparkRunnerRegistrarTest.java | 2 +- .../beam/runners/spark/StreamingTest.java | 23 ++++ .../metrics/sink/SparkMetricsSinkTest.java | 12 +- .../beam/runners/spark/io/AvroPipelineTest.java | 10 +- .../beam/runners/spark/io/NumShardsTest.java | 6 +- .../spark/translation/StorageLevelTest.java | 31 +++++- .../translation/streaming/CreateStreamTest.java | 53 ++++----- .../ResumeFromCheckpointStreamingTest.java | 50 ++++++--- .../streaming/StreamingSourceMetricsTest.java | 14 +-- .../main/java/org/apache/beam/sdk/Pipeline.java | 2 +- .../apache/beam/sdk/testing/TestPipeline.java | 21 +++- 18 files changed, 217 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 38d250e..f7200d6 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -333,9 +333,6 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> - <excludedGroups> - org.apache.beam.runners.spark.UsesCheckpointRecovery - </excludedGroups> <forkCount>1</forkCount> <reuseForks>false</reuseForks> <systemPropertyVariables> @@ -344,6 +341,50 @@ <spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress> </systemPropertyVariables> </configuration> + <executions> + <execution> + <id>default-test</id> + <goals> + <goal>test</goal> + </goals> + <configuration> + <excludedGroups> + org.apache.beam.runners.spark.UsesCheckpointRecovery, + org.apache.beam.runners.spark.StreamingTest + </excludedGroups> + <systemPropertyVariables> + <beamTestPipelineOptions> + [ + "--runner=TestSparkRunner", + "--streaming=false", + "--enableSparkMetricSinks=true" + ] + </beamTestPipelineOptions> + </systemPropertyVariables> + </configuration> + </execution> + <execution> + <id>streaming-tests</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <groups> + org.apache.beam.runners.spark.StreamingTest + </groups> + <systemPropertyVariables> + <beamTestPipelineOptions> + [ + "--runner=TestSparkRunner", + "--forceStreaming=true", + "--enableSparkMetricSinks=true" + ] + </beamTestPipelineOptions> + </systemPropertyVariables> + </configuration> + </execution> + </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java index bedfda4..bf926dc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -54,7 +54,9 @@ public final class SparkRunnerRegistrar { public static class Options implements PipelineOptionsRegistrar { @Override public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class); + return ImmutableList.<Class<? extends PipelineOptions>>of( + SparkPipelineOptions.class, + TestSparkPipelineOptions.class); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java index c3b48d8..24b2e7b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java @@ -23,11 +23,11 @@ import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.JavaSparkContext; -import org.junit.Rule; import org.junit.Test; /** @@ -36,12 +36,12 @@ import org.junit.Test; */ public class CacheTest { - @Rule - public final transient PipelineRule pipelineRule = PipelineRule.batch(); - @Test public void cacheCandidatesUpdaterTest() throws Exception { - Pipeline pipeline = pipelineRule.createPipeline(); + SparkPipelineOptions options = + PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); + options.setRunner(TestSparkRunner.class); + Pipeline pipeline = Pipeline.create(options); PCollection<String> pCollection = pipeline.apply(Create.of("foo", "bar")); // first read pCollection.apply(Count.<String>globally()); @@ -50,7 +50,7 @@ public class CacheTest { // will cache the RDD representing this PCollection pCollection.apply(Count.<String>globally()); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineRule.getOptions()); + JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); SparkRunner.CacheVisitor cacheVisitor = new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt); http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java index b60faf2..7bfc980 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java @@ -25,9 +25,9 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; -import org.junit.Rule; import org.junit.Test; @@ -44,19 +44,23 @@ import org.junit.Test; */ public class ForceStreamingTest { - @Rule - public final PipelineRule pipelineRule = PipelineRule.streaming(); - @Test public void test() throws IOException { - Pipeline pipeline = pipelineRule.createPipeline(); + TestSparkPipelineOptions options = + PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); + options.setRunner(TestSparkRunner.class); + options.setForceStreaming(true); + + // pipeline with a bounded read. + Pipeline pipeline = Pipeline.create(options); // apply the BoundedReadFromUnboundedSource. BoundedReadFromUnboundedSource<?> boundedRead = Read.from(CountingSource.unbounded()).withMaxNumRecords(-1); - //noinspection unchecked pipeline.apply(boundedRead); - TestSparkRunner runner = TestSparkRunner.fromOptions(pipelineRule.getOptions()); + + // adapt reads + TestSparkRunner runner = TestSparkRunner.fromOptions(options); runner.adaptBoundedReads(pipeline); UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector(); http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java deleted file mode 100644 index f8499f3..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.joda.time.Duration; -import org.junit.rules.ExternalResource; -import org.junit.rules.RuleChain; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.TestName; -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -/** - * A {@link org.junit.Rule} to provide a {@link Pipeline} instance for Spark runner tests. - */ -public class PipelineRule implements TestRule { - - private final SparkPipelineRule delegate; - private final RuleChain chain; - - private PipelineRule(SparkPipelineRule delegate) { - TestName testName = new TestName(); - this.delegate = delegate; - this.delegate.setTestName(testName); - this.chain = RuleChain.outerRule(testName).around(this.delegate); - } - - public static PipelineRule streaming() { - return new PipelineRule(new SparkStreamingPipelineRule()); - } - - public static PipelineRule batch() { - return new PipelineRule(new SparkPipelineRule()); - } - - public Duration batchDuration() { - return Duration.millis(delegate.options.getBatchIntervalMillis()); - } - - public TestSparkPipelineOptions getOptions() { - return delegate.options; - } - - public Pipeline createPipeline() { - return Pipeline.create(delegate.options); - } - - @Override - public Statement apply(Statement statement, Description description) { - return chain.apply(statement, description); - } - - private static class SparkStreamingPipelineRule extends SparkPipelineRule { - - private final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Override - protected void before() throws Throwable { - super.before(); - temporaryFolder.create(); - options.setForceStreaming(true); - options.setCheckpointDir( - temporaryFolder.newFolder(options.getJobName()).toURI().toURL().toString()); - } - - @Override - protected void after() { - temporaryFolder.delete(); - } - } - - private static class SparkPipelineRule extends ExternalResource { - - protected final TestSparkPipelineOptions options = - PipelineOptionsFactory.as(TestSparkPipelineOptions.class); - - private TestName testName; - - public void setTestName(TestName testName) { - this.testName = testName; - } - - @Override - protected void before() throws Throwable { - options.setRunner(TestSparkRunner.class); - options.setEnableSparkMetricSinks(false); - options.setJobName( - testName != null ? testName.getMethodName() : "test-at-" + System.currentTimeMillis()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index 36ba863..8112993 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -27,9 +27,11 @@ import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; @@ -95,7 +97,9 @@ public class ProvidedSparkContextTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); // Run test from pipeline - p.run().waitUntilFinish(); + PipelineResult result = p.run(); + + TestPipeline.verifyPAssertsSucceeded(p, result); } private void testWithInvalidContext(JavaSparkContext jsc) { @@ -104,11 +108,9 @@ public class ProvidedSparkContextTest { Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder .of())); - PCollection<String> output = inputWords.apply(new WordCount.CountWords()) + inputWords.apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); - PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - try { p.run().waitUntilFinish(); fail("Should throw an exception when The provided Spark context is null or stopped"); http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index ea058b2..9009751 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -27,6 +27,8 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Distinct; @@ -48,7 +50,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.hamcrest.Matchers; import org.joda.time.Duration; -import org.junit.Rule; import org.junit.Test; @@ -57,15 +58,9 @@ import org.junit.Test; */ public class SparkRunnerDebuggerTest { - @Rule - public final PipelineRule batchPipelineRule = PipelineRule.batch(); - - @Rule - public final PipelineRule streamingPipelineRule = PipelineRule.streaming(); - @Test public void debugBatchPipeline() { - TestSparkPipelineOptions options = batchPipelineRule.getOptions(); + PipelineOptions options = PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); options.setRunner(SparkRunnerDebugger.class); Pipeline pipeline = Pipeline.create(options); @@ -111,7 +106,9 @@ public class SparkRunnerDebuggerTest { @Test public void debugStreamingPipeline() { - TestSparkPipelineOptions options = streamingPipelineRule.getOptions(); + TestSparkPipelineOptions options = + PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); + options.setForceStreaming(true); options.setRunner(SparkRunnerDebugger.class); Pipeline pipeline = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index 4e1fd7c..75899f9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -38,7 +38,7 @@ public class SparkRunnerRegistrarTest { @Test public void testOptions() { assertEquals( - ImmutableList.of(SparkPipelineOptions.class), + ImmutableList.of(SparkPipelineOptions.class, TestSparkPipelineOptions.class), new SparkRunnerRegistrar.Options().getPipelineOptions()); } http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java new file mode 100644 index 0000000..a34c184 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +/** + * Category tag for tests that should be run in streaming mode. + */ +public interface StreamingTest {} http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java index b0ad972..fff95cb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -26,11 +26,10 @@ import com.google.common.collect.ImmutableSet; import java.util.Arrays; import java.util.List; import java.util.Set; -import org.apache.beam.runners.spark.PipelineRule; import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; @@ -48,12 +47,7 @@ public class SparkMetricsSinkTest { public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule - public final PipelineRule pipelineRule = PipelineRule.batch(); - - private Pipeline createSparkPipeline() { - pipelineRule.getOptions().setEnableSparkMetricSinks(true); - return pipelineRule.createPipeline(); - } + public final TestPipeline pipeline = TestPipeline.create(); private void runPipeline() { final List<String> words = @@ -62,8 +56,6 @@ public class SparkMetricsSinkTest { final Set<String> expectedCounts = ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); - final Pipeline pipeline = createSparkPipeline(); - final PCollection<String> output = pipeline .apply(Create.of(words).withCoder(StringUtf8Coder.of())) http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index 7188dc5..adde8d2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -33,9 +33,8 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.beam.runners.spark.PipelineRule; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.junit.Before; import org.junit.Rule; @@ -54,7 +53,7 @@ public class AvroPipelineTest { public final TemporaryFolder tmpDir = new TemporaryFolder(); @Rule - public final PipelineRule pipelineRule = PipelineRule.batch(); + public final TestPipeline pipeline = TestPipeline.create(); @Before public void setUp() throws IOException { @@ -72,11 +71,10 @@ public class AvroPipelineTest { savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); populateGenericFile(Lists.newArrayList(savedRecord), schema); - Pipeline p = pipelineRule.createPipeline(); - PCollection<GenericRecord> input = p.apply( + PCollection<GenericRecord> input = pipeline.apply( AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath())); input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath())); - p.run().waitUntilFinish(); + pipeline.run(); List<GenericRecord> records = readGenericFile(); assertEquals(Lists.newArrayList(savedRecord), records); http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 5021744..55ee938 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -30,11 +30,10 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; -import org.apache.beam.runners.spark.PipelineRule; import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; @@ -59,7 +58,7 @@ public class NumShardsTest { public final TemporaryFolder tmpDir = new TemporaryFolder(); @Rule - public final PipelineRule pipelineRule = PipelineRule.batch(); + public final TestPipeline p = TestPipeline.create(); @Before public void setUp() throws IOException { @@ -69,7 +68,6 @@ public class NumShardsTest { @Test public void testText() throws Exception { - Pipeline p = pipelineRule.createPipeline(); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java index 2b7b87b..8f2e681 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java @@ -15,30 +15,49 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.beam.runners.spark.translation; -import org.apache.beam.runners.spark.PipelineRule; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; + /** * Test the RDD storage level defined by user. */ public class StorageLevelTest { + private static String beamTestPipelineOptions; + @Rule - public final transient PipelineRule pipelineRule = PipelineRule.batch(); + public final TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void init() { + beamTestPipelineOptions = + System.getProperty(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS); + + System.setProperty( + TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS, + beamTestPipelineOptions.replace("]", ", \"--storageLevel=DISK_ONLY\"]")); + } + + @AfterClass + public static void teardown() { + System.setProperty( + TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS, + beamTestPipelineOptions); + } @Test public void test() throws Exception { - pipelineRule.getOptions().setStorageLevel("DISK_ONLY"); - Pipeline pipeline = pipelineRule.createPipeline(); - PCollection<String> pCollection = pipeline.apply(Create.of("foo")); // by default, the Spark runner doesn't cache the RDD if it accessed only one time. http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index dd52c05..770e0c0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -24,13 +24,14 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.io.Serializable; -import org.apache.beam.runners.spark.PipelineRule; import org.apache.beam.runners.spark.ReuseSparkContextRule; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.StreamingTest; import org.apache.beam.runners.spark.io.CreateStream; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -61,6 +62,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; @@ -74,10 +76,11 @@ import org.junit.rules.ExpectedException; * {@link org.apache.spark.streaming.dstream.QueueInputDStream} and advance the system's WMs. * //TODO: add synchronized/processing time trigger. */ +@Category(StreamingTest.class) public class CreateStreamTest implements Serializable { @Rule - public final transient PipelineRule pipelineRule = PipelineRule.streaming(); + public final transient TestPipeline p = TestPipeline.create(); @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no(); @Rule @@ -85,10 +88,9 @@ public class CreateStreamTest implements Serializable { @Test public void testLateDataAccumulating() throws IOException { - Pipeline p = pipelineRule.createPipeline(); Instant instant = new Instant(0); CreateStream<Integer> source = - CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), batchDuration()) .emptyBatch() .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6))) .nextBatch( @@ -159,9 +161,8 @@ public class CreateStreamTest implements Serializable { @Test public void testDiscardingMode() throws IOException { - Pipeline p = pipelineRule.createPipeline(); CreateStream<String> source = - CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration()) + CreateStream.of(StringUtf8Coder.of(), batchDuration()) .nextBatch( TimestampedValue.of("firstPane", new Instant(100)), TimestampedValue.of("alsoFirstPane", new Instant(200))) @@ -208,10 +209,9 @@ public class CreateStreamTest implements Serializable { @Test public void testFirstElementLate() throws IOException { - Pipeline p = pipelineRule.createPipeline(); Instant lateElementTimestamp = new Instant(-1_000_000); CreateStream<String> source = - CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration()) + CreateStream.of(StringUtf8Coder.of(), batchDuration()) .emptyBatch() .advanceWatermarkForNextBatch(new Instant(0)) .nextBatch( @@ -242,10 +242,9 @@ public class CreateStreamTest implements Serializable { @Test public void testElementsAtAlmostPositiveInfinity() throws IOException { - Pipeline p = pipelineRule.createPipeline(); Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp(); CreateStream<String> source = - CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration()) + CreateStream.of(StringUtf8Coder.of(), batchDuration()) .nextBatch( TimestampedValue.of("foo", endOfGlobalWindow), TimestampedValue.of("bar", endOfGlobalWindow)) @@ -267,13 +266,12 @@ public class CreateStreamTest implements Serializable { @Test public void testMultipleStreams() throws IOException { - Pipeline p = pipelineRule.createPipeline(); CreateStream<String> source = - CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration()) + CreateStream.of(StringUtf8Coder.of(), batchDuration()) .nextBatch("foo", "bar") .advanceNextBatchWatermarkToInfinity(); CreateStream<Integer> other = - CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), batchDuration()) .nextBatch(1, 2, 3, 4) .advanceNextBatchWatermarkToInfinity(); @@ -298,10 +296,9 @@ public class CreateStreamTest implements Serializable { @Test public void testFlattenedWithWatermarkHold() throws IOException { - Pipeline p = pipelineRule.createPipeline(); Instant instant = new Instant(0); CreateStream<Integer> source1 = - CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), batchDuration()) .emptyBatch() .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5))) .nextBatch( @@ -310,7 +307,7 @@ public class CreateStreamTest implements Serializable { TimestampedValue.of(3, instant)) .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10))); CreateStream<Integer> source2 = - CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), batchDuration()) .emptyBatch() .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1))) .nextBatch( @@ -323,14 +320,14 @@ public class CreateStreamTest implements Serializable { .advanceNextBatchWatermarkToInfinity(); PCollection<Integer> windowed1 = p - .apply(source1) - .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))) + .apply("CreateStream1", source1) + .apply("Window1", Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering(AfterWatermark.pastEndOfWindow()) .accumulatingFiredPanes() .withAllowedLateness(Duration.ZERO)); PCollection<Integer> windowed2 = p - .apply(source2) - .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))) + .apply("CreateStream2", source2) + .apply("Window2", Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering(AfterWatermark.pastEndOfWindow()) .accumulatingFiredPanes() .withAllowedLateness(Duration.ZERO)); @@ -357,10 +354,9 @@ public class CreateStreamTest implements Serializable { */ @Test public void testMultiOutputParDo() throws IOException { - Pipeline p = pipelineRule.createPipeline(); Instant instant = new Instant(0); CreateStream<Integer> source1 = - CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), batchDuration()) .emptyBatch() .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5))) .nextBatch( @@ -397,7 +393,7 @@ public class CreateStreamTest implements Serializable { @Test public void testElementAtPositiveInfinityThrows() { CreateStream<Integer> source = - CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), batchDuration()) .nextBatch(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L))) .advanceNextBatchWatermarkToInfinity(); thrown.expect(IllegalArgumentException.class); @@ -407,7 +403,7 @@ public class CreateStreamTest implements Serializable { @Test public void testAdvanceWatermarkNonMonotonicThrows() { CreateStream<Integer> source = - CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), batchDuration()) .advanceWatermarkForNextBatch(new Instant(0L)); thrown.expect(IllegalArgumentException.class); source @@ -418,9 +414,14 @@ public class CreateStreamTest implements Serializable { @Test public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { CreateStream<Integer> source = - CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration()) + CreateStream.of(VarIntCoder.of(), batchDuration()) .advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)); thrown.expect(IllegalArgumentException.class); source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } + + private Duration batchDuration() { + return Duration.millis( + (p.getOptions().as(SparkPipelineOptions.class)).getBatchIntervalMillis()); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 33571f0..584edac 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -28,15 +28,16 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; +import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.spark.PipelineRule; import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.runners.spark.TestSparkPipelineOptions; +import org.apache.beam.runners.spark.TestSparkRunner; import org.apache.beam.runners.spark.UsesCheckpointRecovery; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.io.MicrobatchSource; @@ -53,6 +54,7 @@ import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -81,11 +83,12 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; - +import org.junit.rules.TemporaryFolder; /** * Tests DStream recovery from checkpoint. @@ -96,24 +99,34 @@ import org.junit.experimental.categories.Category; * {@link Metrics} values that are expected to resume from previous count and a side-input that is * expected to recover as well. */ -public class ResumeFromCheckpointStreamingTest { +public class ResumeFromCheckpointStreamingTest implements Serializable { private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER = new EmbeddedKafkaCluster.EmbeddedZookeeper(); private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER = new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties()); private static final String TOPIC = "kafka_beam_test_topic"; + private transient TemporaryFolder temporaryFolder; + @Rule public final transient ReuseSparkContextRule noContextReuse = ReuseSparkContextRule.no(); - @Rule - public final transient PipelineRule pipelineRule = PipelineRule.streaming(); @BeforeClass - public static void init() throws IOException { + public static void setup() throws IOException { EMBEDDED_ZOOKEEPER.startup(); EMBEDDED_KAFKA_CLUSTER.startup(); } + @Before + public void init() { + temporaryFolder = new TemporaryFolder(); + try { + temporaryFolder.create(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static void produce(Map<String, Instant> messages) { Properties producerProps = new Properties(); producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps()); @@ -148,7 +161,7 @@ public class ResumeFromCheckpointStreamingTest { .build(); // first run should expect EOT matching the last injected element. - SparkPipelineResult res = run(pipelineRule, Optional.of(new Instant(400)), 0); + SparkPipelineResult res = run(Optional.of(new Instant(400)), 0); assertThat(res.metrics().queryMetrics(metricsFilter).counters(), hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(), @@ -169,7 +182,7 @@ public class ResumeFromCheckpointStreamingTest { )); // recovery should resume from last read offset, and read the second batch of input. - res = runAgain(pipelineRule, 1); + res = runAgain(1); // assertions 2: assertThat(res.metrics().queryMetrics(metricsFilter).counters(), hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(), @@ -209,18 +222,18 @@ public class ResumeFromCheckpointStreamingTest { String.format("Found %d failed assertions.", failedAssertions), failedAssertions, is(0L)); - } - private SparkPipelineResult runAgain(PipelineRule pipelineRule, int expectedAssertions) { + private SparkPipelineResult runAgain(int expectedAssertions) { // sleep before next run. Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); - return run(pipelineRule, Optional.<Instant>absent(), expectedAssertions); + return run(Optional.<Instant>absent(), expectedAssertions); } @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private static SparkPipelineResult run( - PipelineRule pipelineRule, Optional<Instant> stopWatermarkOption, int expectedAssertions) { + private SparkPipelineResult run( + Optional<Instant> stopWatermarkOption, + int expectedAssertions) { KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read() .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) .withTopics(Collections.singletonList(TOPIC)) @@ -242,15 +255,21 @@ public class ResumeFromCheckpointStreamingTest { } }); - TestSparkPipelineOptions options = pipelineRule.getOptions(); + TestSparkPipelineOptions options = + PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class); options.setSparkMaster("local[*]"); options.setCheckpointDurationMillis(options.getBatchIntervalMillis()); options.setExpectedAssertions(expectedAssertions); + options.setRunner(TestSparkRunner.class); + options.setEnableSparkMetricSinks(false); + options.setForceStreaming(true); + options.setCheckpointDir(temporaryFolder.getRoot().getPath()); // timeout is per execution so it can be injected by the caller. if (stopWatermarkOption.isPresent()) { options.setStopPipelineWatermark(stopWatermarkOption.get().getMillis()); } - Pipeline p = pipelineRule.createPipeline(); + + Pipeline p = Pipeline.create(options); PCollection<String> expectedCol = p.apply(Create.of(ImmutableList.of("side1", "side2")).withCoder(StringUtf8Coder.of())); @@ -354,5 +373,4 @@ public class ResumeFromCheckpointStreamingTest { } } } - } http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java index 5a4b1b5..df6027c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java @@ -23,9 +23,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertThat; import java.io.Serializable; -import org.apache.beam.runners.spark.PipelineRule; -import org.apache.beam.runners.spark.TestSparkPipelineOptions; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.runners.spark.StreamingTest; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Source; @@ -34,10 +32,11 @@ import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.SourceMetrics; +import org.apache.beam.sdk.testing.TestPipeline; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; - +import org.junit.experimental.categories.Category; /** * Verify metrics support for {@link Source Sources} in streaming pipelines. @@ -47,14 +46,11 @@ public class StreamingSourceMetricsTest implements Serializable { // Force streaming pipeline using pipeline rule. @Rule - public final transient PipelineRule pipelineRule = PipelineRule.streaming(); + public final transient TestPipeline pipeline = TestPipeline.create(); @Test + @Category(StreamingTest.class) public void testUnboundedSourceMetrics() { - TestSparkPipelineOptions options = pipelineRule.getOptions(); - - Pipeline pipeline = Pipeline.create(options); - final long numElements = 1000; pipeline.apply( http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 6b15f0d..6fa7a5a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -460,7 +460,7 @@ public class Pipeline { private Set<String> usedFullNames = new HashSet<>(); private CoderRegistry coderRegistry; private final List<String> unstableNames = new ArrayList<>(); - private final PipelineOptions defaultOptions; + protected final PipelineOptions defaultOptions; protected Pipeline(PipelineOptions options) { this.defaultOptions = options; http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index d8fe51d..2d34b22 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.TreeNode; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; @@ -244,8 +245,11 @@ public class TestPipeline extends Pipeline implements TestRule { } } - static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions"; + /** System property used to set {@link TestPipelineOptions}. */ + public static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions"; + static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner"; + private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( ObjectMapper.findModules(ReflectHelpers.findClassLoader())); @@ -331,7 +335,7 @@ public class TestPipeline extends Pipeline implements TestRule { try { enforcement.get().beforePipelineExecution(); pipelineResult = super.run(); - verifyPAssertsSucceeded(pipelineResult); + verifyPAssertsSucceeded(this, pipelineResult); } catch (RuntimeException exc) { Throwable cause = exc.getCause(); if (cause instanceof AssertionError) { @@ -377,6 +381,15 @@ public class TestPipeline extends Pipeline implements TestRule { return this; } + @VisibleForTesting + @Override + /** + * Get this pipeline's options. + */ + public PipelineOptions getOptions() { + return defaultOptions; + } + @Override public String toString() { return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName(); @@ -501,9 +514,9 @@ public class TestPipeline extends Pipeline implements TestRule { * <p>Note this only runs for runners which support Metrics. Runners which do not should verify * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p> */ - private void verifyPAssertsSucceeded(PipelineResult pipelineResult) { + public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) { if (MetricsEnvironment.isMetricsSupported()) { - long expectedNumberOfAssertions = (long) PAssert.countAsserts(this); + long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline); long successfulAssertions = 0; Iterable<MetricResult<Long>> successCounterResults =