[tests] add streaming mode to TestPipeline

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0be42cbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0be42cbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0be42cbf

Branch: refs/heads/master
Commit: 0be42cbf39ca0740e7f8e5f8faf38aa9126e8cf6
Parents: f0cb5f0
Author: Max <m...@posteo.de>
Authored: Mon Feb 22 16:31:23 2016 +0100
Committer: Davor Bonaci <davorbon...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../flink/dataflow/FlinkPipelineRunner.java     |  3 +-
 .../flink/dataflow/FlinkTestPipeline.java       | 33 ++++++++++++++++++--
 2 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0be42cbf/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index f57fed2..ebd2691 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -150,11 +150,12 @@ public class FlinkPipelineRunner extends 
PipelineRunner<FlinkRunnerResult> {
         *
         * @return The newly created runner.
         */
-       public static FlinkPipelineRunner createForTest() {
+       public static FlinkPipelineRunner createForTest(boolean streaming) {
                FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
                // we use [auto] for testing since this will make it pick up 
the Testing
                // ExecutionEnvironment
                options.setFlinkMaster("[auto]");
+               options.setStreaming(streaming);
                return new FlinkPipelineRunner(options);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0be42cbf/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index a1f66c7..109b1ff 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -27,14 +27,41 @@ import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 public class FlinkTestPipeline extends Pipeline {
 
        /**
-        * Creates and returns a new test pipeline.
+        * Creates and returns a new test pipeline for batch execution.
         *
         * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} 
to add tests, then call
         * {@link Pipeline#run} to execute the pipeline and check the tests.
         */
        public static FlinkTestPipeline create() {
-               FlinkPipelineRunner flinkRunner = 
FlinkPipelineRunner.createForTest();
-               return new FlinkTestPipeline(flinkRunner, 
flinkRunner.getPipelineOptions());
+               return create(false);
+       }
+
+       /**
+        * Creates and returns a new test pipeline for streaming execution.
+        *
+        * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} 
to add tests, then call
+        * {@link Pipeline#run} to execute the pipeline and check the tests.
+        *
+        * @return The Test Pipeline
+        */
+       public static FlinkTestPipeline createStreaming() {
+               return create(true);
+       }
+
+       /**
+        * Creates and returns a new test pipeline for streaming or batch 
execution.
+        *
+        * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} 
to add tests, then call
+        * {@link Pipeline#run} to execute the pipeline and check the tests.
+        *
+        * @param streaming True for streaming mode, False for batch
+        * @return The Test Pipeline
+        */
+       public static FlinkTestPipeline create(boolean streaming) {
+               FlinkPipelineRunner flinkRunner = 
FlinkPipelineRunner.createForTest(streaming);
+               FlinkPipelineOptions pipelineOptions = 
flinkRunner.getPipelineOptions();
+               pipelineOptions.setStreaming(streaming);
+               return new FlinkTestPipeline(flinkRunner, pipelineOptions);
        }
 
        private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> 
runner, PipelineOptions

Reply via email to