Repository: incubator-beam Updated Branches: refs/heads/master 47646d641 -> 503f26f44
[BEAM-891] fix build occasionally fails on IndexOutOfBoundsException. Moved "TestPipelineOptions#withTmpCheckpointDir" to TestPipelineOptionsForStreaming. Removed an unused member in ProvidedSparkContextTest. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0331dd1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0331dd1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0331dd1c Branch: refs/heads/master Commit: 0331dd1cd75e60484f0b15723e4e7edc280a4d12 Parents: 47646d6 Author: Stas Levin <stasle...@gmail.com> Authored: Thu Nov 10 13:32:51 2016 +0200 Committer: Sela <ans...@paypal.com> Committed: Tue Nov 15 19:52:12 2016 +0200 ---------------------------------------------------------------------- runners/spark/pom.xml | 3 +- .../runners/spark/SparkPipelineOptions.java | 4 +- .../spark/translation/SparkRuntimeContext.java | 4 +- .../runners/spark/ProvidedSparkContextTest.java | 26 ++++----- .../metrics/sink/NamedAggregatorsTest.java | 13 +++-- .../beam/runners/spark/io/AvroPipelineTest.java | 12 ++--- .../beam/runners/spark/io/NumShardsTest.java | 10 ++-- .../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++--- .../spark/translation/SideEffectsTest.java | 11 ++-- .../streaming/EmptyStreamAssertionTest.java | 4 +- .../streaming/FlattenStreamingTest.java | 4 +- .../streaming/KafkaStreamingTest.java | 4 +- .../ResumeFromCheckpointStreamingTest.java | 4 +- .../streaming/SimpleStreamingWordCountTest.java | 6 +-- .../utils/TestOptionsForStreaming.java | 55 -------------------- .../streaming/utils/TestPipelineOptions.java | 25 +++++++++ .../utils/TestPipelineOptionsForStreaming.java | 44 ++++++++++++++++ 17 files changed, 132 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 1e4a720..4c5b3f5 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -82,7 +82,8 @@ <beamTestPipelineOptions> [ "--runner=TestSparkRunner", - "--streaming=false" + "--streaming=false", + "--enableSparkMetricSinks=false" ] </beamTestPipelineOptions> <beam.spark.test.reuseSparkContext>true</beam.spark.test.reuseSparkContext> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 5168c6c..b1ebde9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -88,8 +88,8 @@ public interface SparkPipelineOptions @Description("Enable/disable sending aggregator values to Spark's metric sinks") @Default.Boolean(true) - Boolean getEnableSparkSinks(); - void setEnableSparkSinks(Boolean enableSparkSinks); + Boolean getEnableSparkMetricSinks(); + void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks); @Description("If the spark runner will be initialized with a provided Spark Context. " + "The Spark Context should be provided with SparkContextOptions.") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 181a111..564db39 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -86,11 +86,11 @@ public class SparkRuntimeContext implements Serializable { final Accumulator<NamedAggregators> accum = AccumulatorSingleton.getInstance(jsc); final NamedAggregators initialValue = accum.value(); - if (opts.getEnableSparkSinks()) { + if (opts.getEnableSparkMetricSinks()) { final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem(); final AggregatorMetricSource aggregatorMetricSource = new AggregatorMetricSource(opts.getAppName(), initialValue); - // in case the context was not cleared + // re-register the metrics in case of context re-use metricsSystem.removeSource(aggregatorMetricSource); metricsSystem.registerSource(aggregatorMetricSource); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index c225073..bc337c7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.JavaSparkContext; +import org.junit.Rule; import org.junit.Test; /** @@ -48,6 +50,15 @@ public class ProvidedSparkContextTest { private static final String PROVIDED_CONTEXT_EXCEPTION = "The provided Spark context was not created or was stopped"; + private SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) { + final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); + options.setRunner(SparkRunner.class); + options.setUsesProvidedSparkContext(true); + options.setProvidedSparkContext(jsc); + options.setEnableSparkMetricSinks(false); + return options; + } + /** * Provide a context and call pipeline run. * @throws Exception @@ -56,10 +67,7 @@ public class ProvidedSparkContextTest { public void testWithProvidedContext() throws Exception { JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); - SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); - options.setRunner(SparkRunner.class); - options.setUsesProvidedSparkContext(true); - options.setProvidedSparkContext(jsc); + SparkContextOptions options = getSparkContextOptions(jsc); Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder @@ -83,10 +91,7 @@ public class ProvidedSparkContextTest { public void testWithNullContext() throws Exception { JavaSparkContext jsc = null; - SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); - options.setRunner(SparkRunner.class); - options.setUsesProvidedSparkContext(true); - options.setProvidedSparkContext(jsc); + SparkContextOptions options = getSparkContextOptions(jsc); Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder @@ -114,10 +119,7 @@ public class ProvidedSparkContextTest { // Stop the provided Spark context directly jsc.stop(); - SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); - options.setRunner(SparkRunner.class); - options.setUsesProvidedSparkContext(true); - options.setProvidedSparkContext(jsc); + SparkContextOptions options = getSparkContextOptions(jsc); Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java index 194d66a..c220f2b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -27,11 +27,10 @@ import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; @@ -52,9 +51,12 @@ public class NamedAggregatorsTest { @Rule public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule(); + @Rule + public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + private Pipeline createSparkPipeline() { - final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkRunner.class); + SparkPipelineOptions options = pipelineOptions.getOptions(); + options.setEnableSparkMetricSinks(true); return Pipeline.create(options); } @@ -82,6 +84,9 @@ public class NamedAggregatorsTest { @Test public void testNamedAggregators() throws Exception { + // don't reuse context in this test, as is tends to mess up Spark's MetricsSystem thread-safety + System.setProperty("beam.spark.test.reuseSparkContext", "false"); + assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); runPipeline(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index fc53dbd..396a30d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -33,11 +33,10 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.PCollection; import org.junit.Before; import org.junit.Rule; @@ -55,6 +54,9 @@ public class AvroPipelineTest { @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); + @Rule + public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + @Before public void setUp() throws IOException { inputFile = tmpDir.newFile("test.avro"); @@ -71,9 +73,7 @@ public class AvroPipelineTest { savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); populateGenericFile(Lists.newArrayList(savedRecord), schema); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(SparkRunner.class); - Pipeline p = Pipeline.create(options); + Pipeline p = Pipeline.create(pipelineOptions.getOptions()); PCollection<GenericRecord> input = p.apply( AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema)); input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 0ff30ac..922046c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -31,12 +31,11 @@ import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; @@ -60,6 +59,9 @@ public class NumShardsTest { @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); + @Rule + public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + @Before public void setUp() throws IOException { outputDir = tmpDir.newFolder("out"); @@ -68,9 +70,7 @@ public class NumShardsTest { @Test public void testText() throws Exception { - SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkRunner.class); - Pipeline p = Pipeline.create(options); + Pipeline p = Pipeline.create(pipelineOptions.getOptions()); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index aa1e1ce..628d8b6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -22,12 +22,11 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; -import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.WritableCoder; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.hadoop.conf.Configuration; @@ -54,6 +53,9 @@ public class HadoopFileFormatPipelineTest { private File outputFile; @Rule + public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + + @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); @Before @@ -67,9 +69,7 @@ public class HadoopFileFormatPipelineTest { public void testSequenceFile() throws Exception { populateFile(); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(SparkRunner.class); - Pipeline p = Pipeline.create(options); + Pipeline p = Pipeline.create(pipelineOptions.getOptions()); @SuppressWarnings("unchecked") Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = (Class<? extends FileInputFormat<IntWritable, Text>>) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index 1304e12..7d39d89 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -25,13 +25,13 @@ import static org.junit.Assert.fail; import java.io.Serializable; import java.net.URI; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.junit.Rule; import org.junit.Test; /** @@ -42,11 +42,12 @@ public class SideEffectsTest implements Serializable { static class UserException extends RuntimeException { } + @Rule + public transient final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + @Test public void test() throws Exception { - SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkRunner.class); - Pipeline p = Pipeline.create(options); + Pipeline p = Pipeline.create(pipelineOptions.getOptions()); p.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java index 3e95b4d..2a38e30 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java @@ -25,7 +25,7 @@ import java.util.Collections; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -51,7 +51,7 @@ public class EmptyStreamAssertionTest implements Serializable { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming(); + public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming(); @Test public void testAssertion() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 319b5e9..bd544e9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Create; @@ -57,7 +57,7 @@ public class FlattenStreamingTest { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming(); + public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming(); @Test public void testFlattenUnbounded() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 29e4609..5841331 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -28,7 +28,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -71,7 +71,7 @@ public class KafkaStreamingTest { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming(); + public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming(); @Test public void testEarliest2Topics() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 34ffbe2..e345831 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -32,7 +32,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; @@ -80,7 +80,7 @@ public class ResumeFromCheckpointStreamingTest { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming(); + public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming(); @BeforeClass public static void init() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index edba507..bdfc24f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -25,7 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.MapElements; @@ -47,7 +47,7 @@ public class SimpleStreamingWordCountTest implements Serializable { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming(); + public TestPipelineOptionsForStreaming pipelineOptions = new TestPipelineOptionsForStreaming(); private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; @@ -62,7 +62,7 @@ public class SimpleStreamingWordCountTest implements Serializable { @Test public void testFixedWindows() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + SparkPipelineOptions options = pipelineOptions.withTmpCheckpointDir(checkpointParentDir); // override defaults options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java deleted file mode 100644 index 2861d9f..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation.streaming.utils; - - -import java.io.IOException; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkRunner; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.rules.ExternalResource; -import org.junit.rules.TemporaryFolder; - - - -/** - * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines. - */ -public class TestOptionsForStreaming extends ExternalResource { - private final SparkPipelineOptions options = - PipelineOptionsFactory.as(SparkPipelineOptions.class); - - @Override - protected void before() throws Throwable { - options.setRunner(SparkRunner.class); - options.setStreaming(true); - options.setTimeout(1000L); - } - - public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent) - throws IOException { - // tests use JUnit's TemporaryFolder path in the form of: /.../junit/... - options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString()); - return options; - } - - public SparkPipelineOptions getOptions() { - return options; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java new file mode 100644 index 0000000..ccfb29e --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java @@ -0,0 +1,25 @@ +package org.apache.beam.runners.spark.translation.streaming.utils; + +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.rules.ExternalResource; + +/** + * A rule to create a common {@link SparkPipelineOptions} test options for spark-runner. + */ +public class TestPipelineOptions extends ExternalResource { + + protected final SparkPipelineOptions options = + PipelineOptionsFactory.as(SparkPipelineOptions.class); + + @Override + protected void before() throws Throwable { + options.setRunner(SparkRunner.class); + options.setEnableSparkMetricSinks(false); + } + + public SparkPipelineOptions getOptions() { + return options; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java new file mode 100644 index 0000000..3d178ae --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation.streaming.utils; + +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.junit.rules.TemporaryFolder; +import java.io.IOException; + + +/** + * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines. + */ +public class TestPipelineOptionsForStreaming extends TestPipelineOptions { + + @Override + protected void before() throws Throwable { + super.before(); + options.setTimeout(1000L); + options.setStreaming(true); + } + + public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent) + throws IOException { + // tests use JUnit's TemporaryFolder path in the form of: /.../junit/... + options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString()); + return options; + } +}