Repository: incubator-beam Updated Branches: refs/heads/master 8dc9032ce -> 96d324e39
Remove the DirectPipeline class Users who wish to use the DirectPipelineRunner should do so by creating a new Pipeline with the runner set in the PipelineOptions. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a44d126 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a44d126 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a44d126 Branch: refs/heads/master Commit: 5a44d126982d30feec0ba7dea0e4d934494af235 Parents: 8dc9032 Author: Thomas Groh <tg...@google.com> Authored: Thu Apr 14 16:12:23 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Apr 15 09:39:33 2016 -0700 ---------------------------------------------------------------------- .../BlockingDataflowPipelineRunnerTest.java | 20 +++---- .../translation/TransformTranslatorTest.java | 51 ++++++------------ .../beam/sdk/options/DirectPipelineOptions.java | 9 ++-- .../apache/beam/sdk/runners/DirectPipeline.java | 56 -------------------- .../beam/sdk/runners/DirectPipelineRunner.java | 15 ------ .../beam/sdk/io/AvroIOGeneratedClassTest.java | 13 +++-- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 9 ++-- .../sdk/runners/DirectPipelineRunnerTest.java | 29 ++++++---- .../beam/sdk/runners/DirectPipelineTest.java | 35 ------------ .../beam/sdk/runners/TransformTreeTest.java | 7 +-- .../transforms/ApproximateQuantilesTest.java | 5 +- 11 files changed, 66 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java index 67ecdbe..ae504ed 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestDataflowPipelineOptions; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.util.MonitoringUtil; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.util.TestCredential; @@ -209,7 +210,7 @@ public class BlockingDataflowPipelineRunnerTest { @Test public void testJobDoneComplete() throws Exception { createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE)) - .run(DirectPipeline.createForTest()); + .run(TestPipeline.create()); expectedLogs.verifyInfo("Job finished with status DONE"); } @@ -223,7 +224,7 @@ public class BlockingDataflowPipelineRunnerTest { expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( JobIdMatcher.expectJobId("testFailedJob-jobId"))); createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED)) - .run(DirectPipeline.createForTest()); + .run(TestPipeline.create()); } /** @@ -236,8 +237,8 @@ public class BlockingDataflowPipelineRunnerTest { expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( JobIdMatcher.expectJobId("testCancelledJob-jobId"))); createMockRunner( - createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED)) - .run(DirectPipeline.createForTest()); + createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED)) + .run(TestPipeline.create()); } /** @@ -256,7 +257,7 @@ public class BlockingDataflowPipelineRunnerTest { DataflowPipelineJob replacedByJob = createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE); when(job.getReplacedByJob()).thenReturn(replacedByJob); - createMockRunner(job).run(DirectPipeline.createForTest()); + createMockRunner(job).run(TestPipeline.create()); } /** @@ -269,8 +270,8 @@ public class BlockingDataflowPipelineRunnerTest { public void testUnknownJobThrowsException() throws Exception { expectedThrown.expect(IllegalStateException.class); createMockRunner( - createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN)) - .run(DirectPipeline.createForTest()); + createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN)) + .run(TestPipeline.create()); } /** @@ -283,9 +284,8 @@ public class BlockingDataflowPipelineRunnerTest { expectedThrown.expect(DataflowServiceException.class); expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( JobIdMatcher.expectJobId("testNullJob-jobId"))); - createMockRunner( - createMockJob("testNullJob-projectId", "testNullJob-jobId", null)) - .run(DirectPipeline.createForTest()); + createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null)) + .run(TestPipeline.create()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index de4a5d2..4ef26d3 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -18,29 +18,28 @@ package org.apache.beam.runners.spark.translation; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.values.PCollection; -import com.google.api.client.repackaged.com.google.common.base.Joiner; import com.google.common.base.Charsets; -import org.apache.commons.io.FileUtils; -import org.junit.Assert; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestName; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Collections; import java.util.List; /** @@ -50,22 +49,7 @@ import java.util.List; * executed in Spark. */ public class TransformTranslatorTest { - - @Rule - public TestName name = new TestName(); - - private DirectPipelineRunner directRunner; - private SparkPipelineRunner sparkRunner; - private String testDataDirName; - - @Before public void init() throws IOException { - sparkRunner = SparkPipelineRunner.create(); - directRunner = DirectPipelineRunner.createForTest(); - testDataDirName = Joiner.on(File.separator).join("target", "test-data", name.getMethodName()) - + File.separator; - FileUtils.deleteDirectory(new File(testDataDirName)); - new File(testDataDirName).mkdirs(); - } + @Rule public TemporaryFolder tmp = new TemporaryFolder(); /** * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline @@ -74,8 +58,8 @@ public class TransformTranslatorTest { */ @Test public void testTextIOReadAndWriteTransforms() throws IOException { - String directOut = runPipeline("direct", directRunner); - String sparkOut = runPipeline("spark", sparkRunner); + String directOut = runPipeline(DirectPipelineRunner.class); + String sparkOut = runPipeline(SparkPipelineRunner.class); List<String> directOutput = Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8); @@ -84,18 +68,17 @@ public class TransformTranslatorTest { Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8); // sort output to get a stable result (PCollections are not ordered) - Collections.sort(directOutput); - Collections.sort(sparkOutput); - - Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray()); + assertThat(sparkOutput, containsInAnyOrder(directOutput.toArray())); } - private String runPipeline(String name, PipelineRunner<?> runner) { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name); + private String runPipeline(Class<? extends PipelineRunner<?>> runner) throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(runner); + Pipeline p = Pipeline.create(options); + File outFile = tmp.newFile(); PCollection<String> lines = p.apply(TextIO.Read.from("src/test/resources/test_text.txt")); - lines.apply(TextIO.Write.to(outFile)); - runner.run(p); - return outFile; + lines.apply(TextIO.Write.to(outFile.getAbsolutePath())); + p.run(); + return outFile.getAbsolutePath(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java index 718948e..4cdc0ca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java @@ -18,18 +18,17 @@ package org.apache.beam.sdk.options; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipeline; import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.values.PCollection; import com.fasterxml.jackson.annotation.JsonIgnore; /** - * Options that can be used to configure the {@link DirectPipeline}. + * Options that can be used to configure the {@link DirectPipelineRunner}. */ -public interface DirectPipelineOptions extends - ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, - PipelineOptions, StreamingOptions { +public interface DirectPipelineOptions + extends ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, PipelineOptions, + StreamingOptions { /** * The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java deleted file mode 100644 index 45f7647..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.DirectPipelineOptions; - -/** - * A {@link DirectPipeline} is a {@link Pipeline} that returns - * {@link DirectPipelineRunner.EvaluationResults} when it is - * {@link org.apache.beam.sdk.Pipeline#run()}. - */ -public class DirectPipeline extends Pipeline { - - /** - * Creates and returns a new DirectPipeline instance for tests. - */ - public static DirectPipeline createForTest() { - DirectPipelineRunner runner = DirectPipelineRunner.createForTest(); - return new DirectPipeline(runner, runner.getPipelineOptions()); - } - - private DirectPipeline(DirectPipelineRunner runner, DirectPipelineOptions options) { - super(runner, options); - } - - @Override - public DirectPipelineRunner.EvaluationResults run() { - return (DirectPipelineRunner.EvaluationResults) super.run(); - } - - @Override - public DirectPipelineRunner getRunner() { - return (DirectPipelineRunner) super.getRunner(); - } - - @Override - public String toString() { - return "DirectPipeline#" + hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java index 198d04e..3cb9703 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java @@ -34,8 +34,6 @@ import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.DirectPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -59,7 +57,6 @@ import org.apache.beam.sdk.util.MapAggregatorValues; import org.apache.beam.sdk.util.PerKeyCombineFnRunner; import org.apache.beam.sdk.util.PerKeyCombineFnRunners; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.common.Counter; @@ -195,18 +192,6 @@ public class DirectPipelineRunner } /** - * Constructs a runner with default properties for testing. - * - * @return The newly created runner. - */ - public static DirectPipelineRunner createForTest() { - DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); - options.setStableUniqueNames(CheckEnabled.ERROR); - options.setGcpCredential(new TestCredential()); - return new DirectPipelineRunner(options); - } - - /** * Enable runtime testing to verify that all functions and {@link Coder} * instances can be serialized. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java index f32a420..f757b4e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java @@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipeline; -import org.apache.beam.sdk.runners.DirectPipelineRunner.EvaluationResults; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; @@ -137,12 +137,11 @@ public class AvroIOGeneratedClassTest { throws Exception { generateAvroFile(generateAvroObjects()); - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); PCollection<T> output = p.apply(read); - EvaluationResults results = p.run(); + PAssert.that(output).containsInAnyOrder(expectedOutput); + p.run(); assertEquals(expectedName, output.getName()); - assertThat(results.getPCollection(output), - containsInAnyOrder(expectedOutput)); } @Test @@ -257,7 +256,7 @@ public class AvroIOGeneratedClassTest { throws Exception { AvroGeneratedUser[] users = generateAvroObjects(); - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); @SuppressWarnings("unchecked") PCollection<T> input = p.apply(Create.of(Arrays.asList((T[]) users)) .withCoder((Coder<T>) AvroCoder.of(AvroGeneratedUser.class))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 08f146f..57312c0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroIO.Write.Bound; -import org.apache.beam.sdk.runners.DirectPipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -116,7 +115,7 @@ public class AvroIOTest { @Test public void testAvroIOWriteAndReadASingleFile() throws Throwable { - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -127,7 +126,7 @@ public class AvroIOTest { .withSchema(GenericClass.class)); p.run(); - p = DirectPipeline.createForTest(); + p = TestPipeline.create(); PCollection<GenericClass> input = p .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class)); @@ -179,7 +178,7 @@ public class AvroIOTest { */ @Test public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable { - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -192,7 +191,7 @@ public class AvroIOTest { List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null)); - p = DirectPipeline.createForTest(); + p = TestPipeline.create(); PCollection<GenericClassV2> input = p .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java index 2f5272b..ae3b4e0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java @@ -98,16 +98,17 @@ public class DirectPipelineRunnerTest implements Serializable { @Test public void testCoderException() { - DirectPipeline pipeline = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); - pipeline - .apply("CreateTestData", Create.of(42)) + p.apply("CreateTestData", Create.of(42)) .apply("CrashDuringCoding", ParDo.of(new HelloDoFn())) .setCoder(new CrashingCoder<String>()); - expectedException.expect(RuntimeException.class); - expectedException.expectCause(isA(CoderException.class)); - pipeline.run(); + expectedException.expect(RuntimeException.class); + expectedException.expectCause(isA(CoderException.class)); + p.run(); } @Test @@ -119,7 +120,9 @@ public class DirectPipelineRunnerTest implements Serializable { @Test public void testTextIOWriteWithDefaultShardingStrategy() throws Exception { String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output"); - Pipeline p = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; p.apply(Create.of(expectedElements)) .apply(TextIO.Write.to(prefix).withSuffix("txt")); @@ -139,7 +142,9 @@ public class DirectPipelineRunnerTest implements Serializable { public void testTextIOWriteWithLimitedNumberOfShards() throws Exception { final int numShards = 3; String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput"); - Pipeline p = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; p.apply(Create.of(expectedElements)) .apply(TextIO.Write.to(prefix).withNumShards(numShards).withSuffix("txt")); @@ -164,7 +169,9 @@ public class DirectPipelineRunnerTest implements Serializable { @Test public void testAvroIOWriteWithDefaultShardingStrategy() throws Exception { String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output"); - Pipeline p = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; p.apply(Create.of(expectedElements)) .apply(AvroIO.Write.withSchema(String.class).to(prefix).withSuffix(".avro")); @@ -186,7 +193,9 @@ public class DirectPipelineRunnerTest implements Serializable { public void testAvroIOWriteWithLimitedNumberOfShards() throws Exception { final int numShards = 3; String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput"); - Pipeline p = DirectPipeline.createForTest(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); + Pipeline p = Pipeline.create(options); String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; p.apply(Create.of(expectedElements)) .apply(AvroIO.Write.withSchema(String.class).to(prefix) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java deleted file mode 100644 index 9829ebd..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.junit.Assert.assertEquals; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link DirectPipeline}. */ -@RunWith(JUnit4.class) -public class DirectPipelineTest { - @Test - public void testToString() { - DirectPipeline pipeline = DirectPipeline.createForTest(); - assertEquals("DirectPipeline#" + pipeline.hashCode(), - pipeline.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index d926ac5..7690d2b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; @@ -116,7 +117,7 @@ public class TransformTreeTest { File inputFile = tmpFolder.newFile(); File outputFile = tmpFolder.newFile(); - Pipeline p = DirectPipeline.createForTest(); + Pipeline p = TestPipeline.create(); p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath())) .apply(Sample.<String>any(10)) @@ -173,7 +174,7 @@ public class TransformTreeTest { @Test(expected = IllegalStateException.class) public void testOutputChecking() throws Exception { - Pipeline p = DirectPipeline.createForTest(); + Pipeline p = TestPipeline.create(); p.apply(new InvalidCompositeTransform()); @@ -183,7 +184,7 @@ public class TransformTreeTest { @Test public void testMultiGraphSetup() { - Pipeline p = DirectPipeline.createForTest(); + Pipeline p = TestPipeline.create(); PCollection<Integer> input = p.begin() .apply(Create.of(1, 2, 3)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java index 6d62e08..6bc5c1e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.runners.DirectPipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ApproximateQuantiles.ApproximateQuantilesCombineFn; @@ -69,7 +68,7 @@ public class ApproximateQuantilesTest { @Test public void testQuantilesGlobally() { - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); PCollection<Integer> input = intRangeCollection(p, 101); PCollection<List<Integer>> quantiles = @@ -82,7 +81,7 @@ public class ApproximateQuantilesTest { @Test public void testQuantilesGobally_comparable() { - DirectPipeline p = DirectPipeline.createForTest(); + TestPipeline p = TestPipeline.create(); PCollection<Integer> input = intRangeCollection(p, 101); PCollection<List<Integer>> quantiles =