Repository: incubator-beam Updated Branches: refs/heads/master 1eec6863f -> bcefff6a3
Remove the DataflowPipeline Class Pipelines that run with the DataflowPipelineRunner should be created using an appropriately constructed PipelineOptions (i.e. one with the runner set to DataflowPipelineRunner.class) Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1eee4bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1eee4bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1eee4bc Branch: refs/heads/master Commit: b1eee4bcca632eacf4a6dd724ed3eeb27ace0d77 Parents: 1eec686 Author: Thomas Groh <tg...@google.com> Authored: Fri Apr 15 09:23:08 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Apr 15 10:07:57 2016 -0700 ---------------------------------------------------------------------- .../sdk/options/DataflowPipelineOptions.java | 14 +- .../beam/sdk/runners/DataflowPipeline.java | 60 ------- .../sdk/runners/DataflowPipelineRegistrar.java | 4 +- .../sdk/runners/DataflowPipelineRunnerTest.java | 37 +++-- .../beam/sdk/runners/DataflowPipelineTest.java | 45 ------ .../runners/DataflowPipelineTranslatorTest.java | 159 +++++++++++++------ 6 files changed, 136 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java index 4eae85a..50fc956 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.options; -import org.apache.beam.sdk.runners.DataflowPipeline; +import org.apache.beam.sdk.runners.DataflowPipelineRunner; import com.google.common.base.MoreObjects; @@ -27,14 +27,14 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; /** - * Options that can be used to configure the {@link DataflowPipeline}. + * Options that can be used to configure the {@link DataflowPipelineRunner}. */ @Description("Options that configure the Dataflow pipeline.") -public interface DataflowPipelineOptions extends - PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions, - DataflowPipelineWorkerPoolOptions, BigQueryOptions, - GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions, - DataflowProfilingOptions, PubsubOptions { +public interface DataflowPipelineOptions + extends PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions, + DataflowPipelineWorkerPoolOptions, BigQueryOptions, GcsOptions, StreamingOptions, + CloudDebuggerOptions, DataflowWorkerLoggingOptions, DataflowProfilingOptions, + PubsubOptions { @Description("Project id. Required when running a Dataflow in the cloud. " + "See https://cloud.google.com/storage/docs/projects for further details.") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java deleted file mode 100644 index 4d91a38..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.DataflowPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * A {@link DataflowPipeline} is a {@link Pipeline} that returns a - * {@link DataflowPipelineJob} when it is - * {@link org.apache.beam.sdk.Pipeline#run()}. - * - * <p>This is not intended for use by users of Cloud Dataflow. - * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a - * {@link Pipeline}. - */ -public class DataflowPipeline extends Pipeline { - - /** - * Creates and returns a new {@link DataflowPipeline} instance for tests. - */ - public static DataflowPipeline create(DataflowPipelineOptions options) { - return new DataflowPipeline(options); - } - - private DataflowPipeline(DataflowPipelineOptions options) { - super(DataflowPipelineRunner.fromOptions(options), options); - } - - @Override - public DataflowPipelineJob run() { - return (DataflowPipelineJob) super.run(); - } - - @Override - public DataflowPipelineRunner getRunner() { - return (DataflowPipelineRunner) super.getRunner(); - } - - @Override - public String toString() { - return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java index 9333d7d..b0f72ed 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java @@ -26,8 +26,8 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; /** - * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for - * the {@link DataflowPipeline}. + * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for the + * {@link DataflowPipelineRunner}. */ public class DataflowPipelineRegistrar { private DataflowPipelineRegistrar() { } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java index 303acda..8b024fb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java @@ -125,7 +125,7 @@ import java.util.List; import java.util.Map; /** - * Tests for DataflowPipelineRunner. + * Tests for the {@link DataflowPipelineRunner}. */ @RunWith(JUnit4.class) public class DataflowPipelineRunnerTest { @@ -143,9 +143,10 @@ public class DataflowPipelineRunnerTest { assertNull(job.getCurrentState()); } - private DataflowPipeline buildDataflowPipeline(DataflowPipelineOptions options) { + private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) { options.setStableUniqueNames(CheckEnabled.ERROR); - DataflowPipeline p = DataflowPipeline.create(options); + options.setRunner(DataflowPipelineRunner.class); + Pipeline p = Pipeline.create(options); p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); @@ -212,6 +213,7 @@ public class DataflowPipelineRunnerTest { private DataflowPipelineOptions buildPipelineOptions( ArgumentCaptor<Job> jobCaptor) throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); options.setProject(PROJECT_ID); options.setTempLocation("gs://somebucket/some/path"); // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. @@ -227,8 +229,8 @@ public class DataflowPipelineRunnerTest { ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - DataflowPipeline p = buildDataflowPipeline(options); - DataflowPipelineJob job = p.run(); + Pipeline p = buildDataflowPipeline(options); + DataflowPipelineJob job = (DataflowPipelineJob) p.run(); assertEquals("newid", job.getJobId()); assertValidJob(jobCaptor.getValue()); } @@ -246,7 +248,7 @@ public class DataflowPipelineRunnerTest { resultJob.setClientRequestId("different_request_id"); when(mockRequest.execute()).thenReturn(resultJob); - DataflowPipeline p = buildDataflowPipeline(options); + Pipeline p = buildDataflowPipeline(options); try { p.run(); fail("Expected DataflowJobAlreadyExistsException"); @@ -265,8 +267,8 @@ public class DataflowPipelineRunnerTest { DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); options.setUpdate(true); options.setJobName("oldJobName"); - DataflowPipeline p = buildDataflowPipeline(options); - DataflowPipelineJob job = p.run(); + Pipeline p = buildDataflowPipeline(options); + DataflowPipelineJob job = (DataflowPipelineJob) p.run(); assertEquals("newid", job.getJobId()); assertValidJob(jobCaptor.getValue()); } @@ -279,7 +281,7 @@ public class DataflowPipelineRunnerTest { DataflowPipelineOptions options = buildPipelineOptions(); options.setUpdate(true); options.setJobName("badJobName"); - DataflowPipeline p = buildDataflowPipeline(options); + Pipeline p = buildDataflowPipeline(options); p.run(); } @@ -298,7 +300,7 @@ public class DataflowPipelineRunnerTest { resultJob.setClientRequestId("different_request_id"); when(mockRequest.execute()).thenReturn(resultJob); - DataflowPipeline p = buildDataflowPipeline(options); + Pipeline p = buildDataflowPipeline(options); thrown.expect(DataflowJobAlreadyUpdatedException.class); thrown.expect(new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>() { @@ -348,9 +350,9 @@ public class DataflowPipelineRunnerTest { options.setGcsUtil(mockGcsUtil); options.setGcpCredential(new TestCredential()); - DataflowPipeline p = buildDataflowPipeline(options); + Pipeline p = buildDataflowPipeline(options); - DataflowPipelineJob job = p.run(); + DataflowPipelineJob job = (DataflowPipelineJob) p.run(); assertEquals("newid", job.getJobId()); Job workflowJob = jobCaptor.getValue(); @@ -750,7 +752,7 @@ public class DataflowPipelineRunnerTest { ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - DataflowPipeline p = DataflowPipeline.create(options); + Pipeline p = Pipeline.create(options); p.apply(Create.of(Arrays.asList(1, 2, 3))) .apply(new TestTransform()); @@ -758,7 +760,8 @@ public class DataflowPipelineRunnerTest { thrown.expect(IllegalStateException.class); thrown.expectMessage(Matchers.containsString("no translator registered")); DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()); + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); assertValidJob(jobCaptor.getValue()); } @@ -766,7 +769,7 @@ public class DataflowPipelineRunnerTest { public void testTransformTranslator() throws IOException { // Test that we can provide a custom translation DataflowPipelineOptions options = buildPipelineOptions(); - DataflowPipeline p = DataflowPipeline.create(options); + Pipeline p = Pipeline.create(options); TestTransform transform = new TestTransform(); p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of())) @@ -793,7 +796,7 @@ public class DataflowPipelineRunnerTest { }); translator.translate( - p, p.getRunner(), Collections.<DataflowPackage>emptyList()); + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); assertTrue(transform.translated); } @@ -828,7 +831,7 @@ public class DataflowPipelineRunnerTest { @Test public void testApplyIsScopedToExactClass() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); - DataflowPipeline p = DataflowPipeline.create(options); + Pipeline p = Pipeline.create(options); Create.TimestampedValues<String> transform = Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now()))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java deleted file mode 100644 index 947b599..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.options.DataflowPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.TestCredential; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link DataflowPipeline}. */ -@RunWith(JUnit4.class) -public class DataflowPipelineTest { - @Test - public void testToString() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setJobName("TestJobName"); - options.setProject("project-id"); - options.setTempLocation("gs://test/temp/location"); - options.setGcpCredential(new TestCredential()); - options.setPathValidatorClass(NoopPathValidator.class); - assertEquals("DataflowPipeline#TestJobName", - DataflowPipeline.create(options).toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java index 1429e5a..7a3caa6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java @@ -123,8 +123,9 @@ public class DataflowPipelineTranslatorTest implements Serializable { } } - private DataflowPipeline buildPipeline(DataflowPipelineOptions options) { - DataflowPipeline p = DataflowPipeline.create(options); + private Pipeline buildPipeline(DataflowPipelineOptions options) { + options.setRunner(DataflowPipelineRunner.class); + Pipeline p = Pipeline.create(options); p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); @@ -163,6 +164,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); options.setGcpCredential(new TestCredential()); options.setJobName("some-job-name"); options.setProject("some-project"); @@ -178,11 +180,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); options.setRunner(DataflowPipelineRunner.class); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); // Note that the contents of this materialized map may be changed by the act of reading an @@ -212,11 +215,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); options.setNetwork(testNetwork); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -228,11 +232,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { public void testNetworkConfigMissing() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -246,11 +251,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); options.setSubnetwork(testSubnetwork); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -262,11 +268,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { public void testSubnetworkConfigMissing() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -277,11 +284,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { public void testScalingAlgorithmMissing() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -312,11 +320,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); options.setAutoscalingAlgorithm(noScaling); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -346,11 +355,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { options.setMaxNumWorkers(42); options.setAutoscalingAlgorithm(noScaling); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -379,11 +389,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); options.setZone(testZone); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -398,11 +409,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); options.setWorkerMachineType(testMachineType); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -418,11 +430,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); options.setDiskSizeGb(diskSizeGb); - DataflowPipeline p = buildPipeline(options); + Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) - .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); assertEquals(1, job.getEnvironment().getWorkerPools().size()); @@ -442,13 +455,18 @@ public class DataflowPipelineTranslatorTest implements Serializable { Step predefinedStep = createPredefinedStep(); // Create a pipeline that the predefined step will be embedded into - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) .apply(ParDo.of(new NoOpFn())) .apply(new EmbeddedTransform(predefinedStep.clone())) .apply(ParDo.of(new NoOpFn())); - Job job = translator.translate( - pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()) + .getJob(); List<Step> steps = job.getSteps(); assertEquals(4, steps.size()); @@ -491,13 +509,18 @@ public class DataflowPipelineTranslatorTest implements Serializable { private static Step createPredefinedStep() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); String stepName = "DoFn1"; pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) .apply(ParDo.of(new NoOpFn()).named(stepName)) .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out")); - Job job = translator.translate( - pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()) + .getJob(); assertEquals(13, job.getSteps().size()); Step step = job.getSteps().get(1); @@ -617,7 +640,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Test public void testMultiGraphPipelineSerialization() throws IOException { - DataflowPipeline p = DataflowPipeline.create(buildPipelineOptions()); + Pipeline p = Pipeline.create(buildPipelineOptions()); PCollection<Integer> input = p.begin() .apply(Create.of(1, 2, 3)); @@ -629,12 +652,13 @@ public class DataflowPipelineTranslatorTest implements Serializable { PipelineOptionsFactory.as(DataflowPipelineOptions.class)); // Check that translation doesn't fail. - t.translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()); + t.translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); } @Test public void testPartiallyBoundFailure() throws IOException { - Pipeline p = DataflowPipeline.create(buildPipelineOptions()); + Pipeline p = Pipeline.create(buildPipelineOptions()); PCollection<Integer> input = p.begin() .apply(Create.of(1, 2, 3)); @@ -651,7 +675,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Test public void testGoodWildcards() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); applyRead(pipeline, "gs://bucket/foo"); @@ -670,7 +694,10 @@ public class DataflowPipelineTranslatorTest implements Serializable { applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); // Check that translation doesn't fail. - t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()); + t.translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()); } private void applyRead(Pipeline pipeline, String path) { @@ -684,7 +711,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Test public void testBadWildcardRecursive() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz")); @@ -693,7 +720,10 @@ public class DataflowPipelineTranslatorTest implements Serializable { thrown.expectCause(Matchers.allOf( instanceOf(IllegalArgumentException.class), ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage")))); - t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()); + t.translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()); } @Test @@ -706,11 +736,16 @@ public class DataflowPipelineTranslatorTest implements Serializable { options.setExperiments(ImmutableList.of("disable_ism_side_input")); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); pipeline.apply(Create.of(1)) .apply(View.<Integer>asSingleton()); - Job job = translator.translate( - pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()) + .getJob(); List<Step> steps = job.getSteps(); assertEquals(2, steps.size()); @@ -733,11 +768,16 @@ public class DataflowPipelineTranslatorTest implements Serializable { options.setExperiments(ImmutableList.of("disable_ism_side_input")); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); pipeline.apply(Create.of(1, 2, 3)) .apply(View.<Integer>asIterable()); - Job job = translator.translate( - pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()) + .getJob(); List<Step> steps = job.getSteps(); assertEquals(2, steps.size()); @@ -758,11 +798,16 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); pipeline.apply(Create.of(1)) .apply(View.<Integer>asSingleton()); - Job job = translator.translate( - pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()) + .getJob(); List<Step> steps = job.getSteps(); assertEquals(5, steps.size()); @@ -786,11 +831,16 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); pipeline.apply(Create.of(1, 2, 3)) .apply(View.<Integer>asIterable()); - Job job = translator.translate( - pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()) + .getJob(); List<Step> steps = job.getSteps(); assertEquals(3, steps.size()); @@ -810,7 +860,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { public void testStepDisplayData() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - DataflowPipeline pipeline = DataflowPipeline.create(options); + Pipeline pipeline = Pipeline.create(options); DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() { @Override @@ -845,8 +895,13 @@ public class DataflowPipelineTranslatorTest implements Serializable { .apply(ParDo.of(fn1)) .apply(ParDo.of(fn2)); - Job job = translator.translate( - pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.<DataflowPackage>emptyList()) + .getJob(); List<Step> steps = job.getSteps(); assertEquals(3, steps.size());