[tests] add streaming mode to TestPipeline
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0be42cbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0be42cbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0be42cbf Branch: refs/heads/master Commit: 0be42cbf39ca0740e7f8e5f8faf38aa9126e8cf6 Parents: f0cb5f0 Author: Max <m...@posteo.de> Authored: Mon Feb 22 16:31:23 2016 +0100 Committer: Davor Bonaci <davorbon...@users.noreply.github.com> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../flink/dataflow/FlinkPipelineRunner.java | 3 +- .../flink/dataflow/FlinkTestPipeline.java | 33 ++++++++++++++++++-- 2 files changed, 32 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0be42cbf/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java index f57fed2..ebd2691 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java @@ -150,11 +150,12 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { * * @return The newly created runner. */ - public static FlinkPipelineRunner createForTest() { + public static FlinkPipelineRunner createForTest(boolean streaming) { FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); // we use [auto] for testing since this will make it pick up the Testing // ExecutionEnvironment options.setFlinkMaster("[auto]"); + options.setStreaming(streaming); return new FlinkPipelineRunner(options); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0be42cbf/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java index a1f66c7..109b1ff 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java @@ -27,14 +27,41 @@ import com.google.cloud.dataflow.sdk.runners.PipelineRunner; public class FlinkTestPipeline extends Pipeline { /** - * Creates and returns a new test pipeline. + * Creates and returns a new test pipeline for batch execution. * * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. */ public static FlinkTestPipeline create() { - FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(); - return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions()); + return create(false); + } + + /** + * Creates and returns a new test pipeline for streaming execution. + * + * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call + * {@link Pipeline#run} to execute the pipeline and check the tests. + * + * @return The Test Pipeline + */ + public static FlinkTestPipeline createStreaming() { + return create(true); + } + + /** + * Creates and returns a new test pipeline for streaming or batch execution. + * + * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call + * {@link Pipeline#run} to execute the pipeline and check the tests. + * + * @param streaming True for streaming mode, False for batch + * @return The Test Pipeline + */ + public static FlinkTestPipeline create(boolean streaming) { + FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming); + FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions(); + pipelineOptions.setStreaming(streaming); + return new FlinkTestPipeline(flinkRunner, pipelineOptions); } private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions