Add TestFlinkPipelineRunner to FlinkRunnerRegistrar This makes the runner available for selection by integration tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58d66a34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58d66a34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58d66a34 Branch: refs/heads/master Commit: 58d66a344985eecc9cc3f43c0ecd5dbc7b4fb2e6 Parents: dc98211 Author: Kenneth Knowles <k...@google.com> Authored: Mon May 2 13:11:12 2016 -0700 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri May 20 08:08:24 2016 +0200 ---------------------------------------------------------------------- .../beam/runners/flink/FlinkPipelineRunner.java | 16 +---- .../runners/flink/FlinkRunnerRegistrar.java | 4 +- .../runners/flink/TestFlinkPipelineRunner.java | 66 ++++++++++++++++++++ .../beam/runners/flink/FlinkTestPipeline.java | 2 +- 4 files changed, 71 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index 3edf6f3..b5ffced 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -108,7 +108,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { this.flinkJobEnv.translate(pipeline); LOG.info("Starting execution of Flink program."); - + JobExecutionResult result; try { result = this.flinkJobEnv.executePipeline(); @@ -138,20 +138,6 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { return options; } - /** - * Constructs a runner with default properties for testing. - * - * @return The newly created runner. - */ - public static FlinkPipelineRunner createForTest(boolean streaming) { - FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); - // we use [auto] for testing since this will make it pick up the Testing - // ExecutionEnvironment - options.setFlinkMaster("[auto]"); - options.setStreaming(streaming); - return new FlinkPipelineRunner(options); - } - @Override public <Output extends POutput, Input extends PInput> Output apply( PTransform<Input, Output> transform, Input input) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java index cd99f4e..ec61805 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java @@ -41,7 +41,9 @@ public class FlinkRunnerRegistrar { public static class Runner implements PipelineRunnerRegistrar { @Override public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>>of(FlinkPipelineRunner.class); + return ImmutableList.<Class<? extends PipelineRunner<?>>>of( + FlinkPipelineRunner.class, + TestFlinkPipelineRunner.class); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java new file mode 100644 index 0000000..24883c8 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { + + private FlinkPipelineRunner delegate; + + private TestFlinkPipelineRunner(FlinkPipelineOptions options) { + // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment + options.setFlinkMaster("[auto]"); + this.delegate = FlinkPipelineRunner.fromOptions(options); + } + + public static TestFlinkPipelineRunner fromOptions(PipelineOptions options) { + FlinkPipelineOptions flinkOptions = PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); + return new TestFlinkPipelineRunner(flinkOptions); + } + + public static TestFlinkPipelineRunner create(boolean streaming) { + FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + flinkOptions.setStreaming(streaming); + return TestFlinkPipelineRunner.fromOptions(flinkOptions); + } + + @Override + public <OutputT extends POutput, InputT extends PInput> + OutputT apply(PTransform<InputT,OutputT> transform, InputT input) { + return delegate.apply(transform, input); + } + + @Override + public FlinkRunnerResult run(Pipeline pipeline) { + return delegate.run(pipeline); + } + + public PipelineOptions getPipelineOptions() { + return delegate.getPipelineOptions(); + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java index f015a66..edde925 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java @@ -60,7 +60,7 @@ public class FlinkTestPipeline extends Pipeline { * @return The Test Pipeline. */ private static FlinkTestPipeline create(boolean streaming) { - FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming); + TestFlinkPipelineRunner flinkRunner = TestFlinkPipelineRunner.create(streaming); return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions()); }