Tests that can should run with TestSparkRunner.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b88e54a9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b88e54a9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b88e54a9

Branch: refs/heads/master
Commit: b88e54a9a6b05d25a1f52aa764bd4a802be32b78
Parents: 3867dcd
Author: Sela <ans...@paypal.com>
Authored: Mon Feb 20 20:41:00 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:18:05 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/spark/ForceStreamingTest.java   | 12 +++++++-----
 .../beam/runners/spark/ProvidedSparkContextTest.java    |  2 +-
 .../aggregators/metrics/sink/NamedAggregatorsTest.java  |  2 +-
 3 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b88e54a9/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index 70fcb99..c3026ce 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -21,13 +21,14 @@ package org.apache.beam.runners.spark;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.io.IOException;
+import 
org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.junit.Rule;
 import org.junit.Test;
 
 
@@ -44,13 +45,14 @@ import org.junit.Test;
  */
 public class ForceStreamingTest {
 
+  @Rule
+  public SparkTestPipelineOptionsForStreaming commonOptions =
+      new SparkTestPipelineOptionsForStreaming();
+
   @Test
   public void test() throws IOException {
-    SparkPipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
-    // force streaming.
+    SparkPipelineOptions options = commonOptions.getOptions();
     options.setForceStreaming(true);
-
     Pipeline pipeline = Pipeline.create(options);
 
     // apply the BoundedReadFromUnboundedSource.

http://git-wip-us.apache.org/repos/asf/beam/blob/b88e54a9/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index a4190a9..36ba863 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -119,7 +119,7 @@ public class ProvidedSparkContextTest {
 
     private static SparkContextOptions getSparkContextOptions(JavaSparkContext 
jsc) {
         final SparkContextOptions options = 
PipelineOptionsFactory.as(SparkContextOptions.class);
-        options.setRunner(SparkRunner.class);
+        options.setRunner(TestSparkRunner.class);
         options.setUsesProvidedSparkContext(true);
         options.setProvidedSparkContext(jsc);
         options.setEnableSparkMetricSinks(false);

http://git-wip-us.apache.org/repos/asf/beam/blob/b88e54a9/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index 8646510..2f7202c 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -82,7 +82,7 @@ public class NamedAggregatorsTest {
 
     PAssert.that(output).containsInAnyOrder(expectedCounts);
 
-    pipeline.run().waitUntilFinish();
+    pipeline.run();
   }
 
   @Test

Reply via email to