Tests that can should run with TestSparkRunner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b88e54a9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b88e54a9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b88e54a9 Branch: refs/heads/master Commit: b88e54a9a6b05d25a1f52aa764bd4a802be32b78 Parents: 3867dcd Author: Sela <ans...@paypal.com> Authored: Mon Feb 20 20:41:00 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 00:18:05 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/spark/ForceStreamingTest.java | 12 +++++++----- .../beam/runners/spark/ProvidedSparkContextTest.java | 2 +- .../aggregators/metrics/sink/NamedAggregatorsTest.java | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b88e54a9/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java index 70fcb99..c3026ce 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java @@ -21,13 +21,14 @@ package org.apache.beam.runners.spark; import static org.hamcrest.MatcherAssert.assertThat; import java.io.IOException; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; +import org.junit.Rule; import org.junit.Test; @@ -44,13 +45,14 @@ import org.junit.Test; */ public class ForceStreamingTest { + @Rule + public SparkTestPipelineOptionsForStreaming commonOptions = + new SparkTestPipelineOptionsForStreaming(); + @Test public void test() throws IOException { - SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); - options.setRunner(TestSparkRunner.class); - // force streaming. + SparkPipelineOptions options = commonOptions.getOptions(); options.setForceStreaming(true); - Pipeline pipeline = Pipeline.create(options); // apply the BoundedReadFromUnboundedSource. http://git-wip-us.apache.org/repos/asf/beam/blob/b88e54a9/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index a4190a9..36ba863 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -119,7 +119,7 @@ public class ProvidedSparkContextTest { private static SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) { final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); - options.setRunner(SparkRunner.class); + options.setRunner(TestSparkRunner.class); options.setUsesProvidedSparkContext(true); options.setProvidedSparkContext(jsc); options.setEnableSparkMetricSinks(false); http://git-wip-us.apache.org/repos/asf/beam/blob/b88e54a9/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java index 8646510..2f7202c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -82,7 +82,7 @@ public class NamedAggregatorsTest { PAssert.that(output).containsInAnyOrder(expectedCounts); - pipeline.run().waitUntilFinish(); + pipeline.run(); } @Test