[runner] add streaming support with checkpointing

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/edff0785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/edff0785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/edff0785

Branch: refs/heads/master
Commit: edff0785a82d2c6c01abeb3c64ca0d2958ccd0fd
Parents: 517c1bd
Author: Kostas Kloudas <kklou...@gmail.com>
Authored: Wed Dec 9 17:30:53 2015 +0100
Committer: Davor Bonaci <davorbon...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  28 +
 .../dataflow/FlinkJobExecutionEnvironment.java  | 238 ++++++++
 .../flink/dataflow/FlinkPipelineRunner.java     |  99 +--
 .../flink/dataflow/examples/WordCount.java      |   2 +-
 .../examples/streaming/AutoComplete.java        | 384 ++++++++++++
 .../examples/streaming/JoinExamples.java        | 157 +++++
 .../KafkaWindowedWordCountExample.java          | 138 +++++
 .../examples/streaming/WindowedWordCount.java   | 126 ++++
 .../FlinkBatchPipelineTranslator.java           | 152 +++++
 .../FlinkBatchTransformTranslators.java         | 593 ++++++++++++++++++
 .../FlinkBatchTranslationContext.java           | 129 ++++
 .../translation/FlinkPipelineTranslator.java    | 145 +----
 .../FlinkStreamingPipelineTranslator.java       | 138 +++++
 .../FlinkStreamingTransformTranslators.java     | 356 +++++++++++
 .../FlinkStreamingTranslationContext.java       |  86 +++
 .../translation/FlinkTransformTranslators.java  | 594 ------------------
 .../translation/TranslationContext.java         | 129 ----
 .../translation/types/CoderComparator.java      | 216 +++++++
 .../translation/types/CoderComperator.java      | 218 -------
 .../translation/types/CoderTypeInformation.java |   6 +-
 .../translation/types/CoderTypeSerializer.java  |   2 -
 .../translation/types/KvCoderComperator.java    |   2 +-
 .../types/VoidCoderTypeSerializer.java          |   1 -
 .../translation/wrappers/SourceInputFormat.java |   4 +-
 .../streaming/FlinkAbstractParDoWrapper.java    | 274 +++++++++
 .../FlinkGroupAlsoByWindowWrapper.java          | 601 +++++++++++++++++++
 .../streaming/FlinkGroupByKeyWrapper.java       |  56 ++
 .../streaming/FlinkParDoBoundMultiWrapper.java  |  72 +++
 .../streaming/FlinkParDoBoundWrapper.java       |  89 +++
 .../streaming/io/UnboundedFlinkSource.java      |  76 +++
 .../streaming/io/UnboundedSocketSource.java     | 228 +++++++
 .../streaming/io/UnboundedSourceWrapper.java    | 120 ++++
 .../state/AbstractFlinkTimerInternals.java      | 139 +++++
 .../streaming/state/FlinkStateInternals.java    | 533 ++++++++++++++++
 .../streaming/state/StateCheckpointReader.java  |  89 +++
 .../streaming/state/StateCheckpointUtils.java   | 152 +++++
 .../streaming/state/StateCheckpointWriter.java  | 127 ++++
 .../wrappers/streaming/state/StateType.java     |  67 +++
 .../streaming/GroupAlsoByWindowTest.java        | 507 ++++++++++++++++
 .../streaming/StateSerializationTest.java       | 257 ++++++++
 .../flink/dataflow/util/JoinExamples.java       |   4 +-
 41 files changed, 6157 insertions(+), 1177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 6102d74..14693b8 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -71,6 +71,18 @@
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${flink.version}</version>
                </dependency>
@@ -114,6 +126,22 @@
                                </exclusion>
                        </exclusions>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-all</artifactId>
+                       <version>1.9.5</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
new file mode 100644
index 0000000..66d60fa
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
+import 
com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
+import 
com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class FlinkJobExecutionEnvironment {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobExecutionEnvironment.class);
+
+       private final FlinkPipelineOptions options;
+
+       /**
+        * The Flink Batch execution environment. This is instantiated to 
either a
+        * {@link org.apache.flink.api.java.CollectionEnvironment},
+        * a {@link org.apache.flink.api.java.LocalEnvironment} or
+        * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on 
the configuration
+        * options.
+        */
+       private ExecutionEnvironment flinkBatchEnv;
+
+
+       /**
+        * The Flink Streaming execution environment. This is instantiated to 
either a
+        * {@link 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+        * a {@link 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+        * on the configuration options, and more specifically, the url of the 
master url.
+        */
+       private StreamExecutionEnvironment flinkStreamEnv;
+
+       /**
+        * Translator for this FlinkPipelineRunner. Its role is to translate 
the Dataflow operators to
+        * their Flink based counterparts. Based on the options provided by the 
user, if we have a streaming job,
+        * this is instantiated to a FlinkStreamingPipelineTranslator. In other 
case, i.e. a batch job,
+        * a FlinkBatchPipelineTranslator is created.
+        */
+       private FlinkPipelineTranslator flinkPipelineTranslator;
+
+       public FlinkJobExecutionEnvironment(FlinkPipelineOptions options) {
+               if (options == null) {
+                       throw new IllegalArgumentException("Options in the 
FlinkJobExecutionEnvironment cannot be NULL.");
+               }
+               this.options = options;
+               this.createJobEnvironment();
+               this.createJobGraphTranslator();
+       }
+
+       /**
+        * Depending on the type of job (Streaming or Batch) and the 
user-specified options,
+        * this method creates the adequate ExecutionEnvironment.
+        */
+       private void createJobEnvironment() {
+               if (options.isStreaming()) {
+                       LOG.info("Creating the required STREAMING 
Environment.");
+                       createStreamExecutionEnvironment();
+               } else {
+                       LOG.info("Creating the required BATCH Environment.");
+                       createBatchExecutionEnvironment();
+               }
+       }
+
+       /**
+        * Depending on the type of job (Streaming or Batch), this method 
creates the adequate job graph
+        * translator. In the case of batch, it will work with DataSets, while 
for streaming, it will work
+        * with DataStreams.
+        */
+       private void createJobGraphTranslator() {
+               checkInitializationState();
+               if (this.flinkPipelineTranslator != null) {
+                       throw new IllegalStateException("JobGraphTranslator 
already initialized.");
+               }
+
+               this.flinkPipelineTranslator = options.isStreaming() ?
+                               new 
FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+                               new FlinkBatchPipelineTranslator(flinkBatchEnv, 
options);
+       }
+
+       public void translate(Pipeline pipeline) {
+               checkInitializationState();
+               if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+                       createJobEnvironment();
+               }
+               if (this.flinkPipelineTranslator == null) {
+                       createJobGraphTranslator();
+               }
+               this.flinkPipelineTranslator.translate(pipeline);
+       }
+
+       public JobExecutionResult executeJob() throws Exception {
+               if (options.isStreaming()) {
+
+                       System.out.println("Plan: " + 
this.flinkStreamEnv.getExecutionPlan());
+
+                       if (this.flinkStreamEnv == null) {
+                               throw new 
RuntimeException("JobExecutionEnvironment not initialized.");
+                       }
+                       if (this.flinkPipelineTranslator == null) {
+                               throw new RuntimeException("JobGraphTranslator 
not initialized.");
+                       }
+                       return this.flinkStreamEnv.execute();
+               } else {
+                       if (this.flinkBatchEnv == null) {
+                               throw new 
RuntimeException("JobExecutionEnvironment not initialized.");
+                       }
+                       if (this.flinkPipelineTranslator == null) {
+                               throw new RuntimeException("JobGraphTranslator 
not initialized.");
+                       }
+                       return this.flinkBatchEnv.execute();
+               }
+       }
+
+       /**
+        * If the submitted job is a batch processing job, this method creates 
the adequate
+        * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} 
depending
+        * on the user-specified options.
+        */
+       private void createBatchExecutionEnvironment() {
+               if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+                       throw new RuntimeException("JobExecutionEnvironment 
already initialized.");
+               }
+
+               String masterUrl = options.getFlinkMaster();
+               this.flinkStreamEnv = null;
+
+               // depending on the master, create the right environment.
+               if (masterUrl.equals("[local]")) {
+                       this.flinkBatchEnv = 
ExecutionEnvironment.createLocalEnvironment();
+               } else if (masterUrl.equals("[collection]")) {
+                       this.flinkBatchEnv = new CollectionEnvironment();
+               } else if (masterUrl.equals("[auto]")) {
+                       this.flinkBatchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
+               } else if (masterUrl.matches(".*:\\d*")) {
+                       String[] parts = masterUrl.split(":");
+                       List<String> stagingFiles = options.getFilesToStage();
+                       this.flinkBatchEnv = 
ExecutionEnvironment.createRemoteEnvironment(parts[0],
+                                       Integer.parseInt(parts[1]),
+                                       stagingFiles.toArray(new 
String[stagingFiles.size()]));
+               } else {
+                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
+                       this.flinkBatchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
+               }
+
+               // set the correct parallelism.
+               if (options.getParallelism() != -1 && !(this.flinkBatchEnv 
instanceof CollectionEnvironment)) {
+                       
this.flinkBatchEnv.setParallelism(options.getParallelism());
+               }
+
+               // set parallelism in the options (required by some execution 
code)
+               options.setParallelism(flinkBatchEnv.getParallelism());
+       }
+
+       /**
+        * If the submitted job is a stream processing job, this method creates 
the adequate
+        * Flink {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+        * on the user-specified options.
+        */
+       private void createStreamExecutionEnvironment() {
+               if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+                       throw new RuntimeException("JobExecutionEnvironment 
already initialized.");
+               }
+
+               String masterUrl = options.getFlinkMaster();
+               this.flinkBatchEnv = null;
+
+               // depending on the master, create the right environment.
+               if (masterUrl.equals("[local]")) {
+                       this.flinkStreamEnv = 
StreamExecutionEnvironment.createLocalEnvironment();
+               } else if (masterUrl.equals("[auto]")) {
+                       this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               } else if (masterUrl.matches(".*:\\d*")) {
+                       String[] parts = masterUrl.split(":");
+                       List<String> stagingFiles = options.getFilesToStage();
+                       this.flinkStreamEnv = 
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+                                       Integer.parseInt(parts[1]), 
stagingFiles.toArray(new String[stagingFiles.size()]));
+               } else {
+                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
+                       this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               }
+
+               // set the correct parallelism.
+               if (options.getParallelism() != -1) {
+                       
this.flinkStreamEnv.setParallelism(options.getParallelism());
+               }
+
+               // set parallelism in the options (required by some execution 
code)
+               options.setParallelism(flinkStreamEnv.getParallelism());
+
+               // although we do not use the generated timestamps,
+               // enabling timestamps is needed for the watermarks.
+               this.flinkStreamEnv.getConfig().enableTimestamps();
+
+               
this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               this.flinkStreamEnv.enableCheckpointing(1000);
+               this.flinkStreamEnv.setNumberOfExecutionRetries(5);
+
+               LOG.info("Setting execution retry delay to 3 sec");
+               this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000);
+       }
+
+       private final void checkInitializationState() {
+               if (this.options == null) {
+                       throw new 
IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet.");
+               }
+
+               if (options.isStreaming() && this.flinkBatchEnv != null) {
+                       throw new IllegalStateException("Attempted to run a 
Streaming Job with a Batch Execution Environment.");
+               } else if (!options.isStreaming() && this.flinkStreamEnv != 
null) {
+                       throw new IllegalStateException("Attempted to run a 
Batch Job with a Streaming Execution Environment.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index ae31f48..f57fed2 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -15,7 +15,6 @@
  */
 package com.dataartisans.flink.dataflow;
 
-import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
@@ -28,8 +27,6 @@ import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,27 +42,19 @@ import java.util.Map;
  * A {@link PipelineRunner} that executes the operations in the
  * pipeline by first translating them to a Flink Plan and then executing them 
either locally
  * or on a Flink cluster, depending on the configuration.
- *
+ * <p>
  * This is based on {@link 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
  */
 public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineRunner.class);
 
-       /** Provided options. */
-       private final FlinkPipelineOptions options;
-
        /**
-        * The Flink execution environment. This is instantiated to either a
-        * {@link org.apache.flink.api.java.CollectionEnvironment},
-        * a {@link org.apache.flink.api.java.LocalEnvironment} or
-        * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on 
the configuration
-        * options.
+        * Provided options.
         */
-       private final ExecutionEnvironment flinkEnv;
+       private final FlinkPipelineOptions options;
 
-       /** Translator for this FlinkPipelineRunner, based on options. */
-       private final FlinkPipelineTranslator translator;
+       private final FlinkJobExecutionEnvironment flinkJobEnv;
 
        /**
         * Construct a runner from the provided options.
@@ -109,90 +98,38 @@ public class FlinkPipelineRunner extends 
PipelineRunner<FlinkRunnerResult> {
                        flinkOptions.setFlinkMaster("[auto]");
                }
 
-               if (flinkOptions.isStreaming()) {
-                       throw new RuntimeException("Streaming is currently not 
supported.");
-               }
-
                return new FlinkPipelineRunner(flinkOptions);
        }
 
        private FlinkPipelineRunner(FlinkPipelineOptions options) {
                this.options = options;
-               this.flinkEnv = createExecutionEnvironment(options);
-
-               // set parallelism in the options (required by some execution 
code)
-               options.setParallelism(flinkEnv.getParallelism());
-
-               this.translator = new FlinkPipelineTranslator(flinkEnv, 
options);
-       }
-
-       /**
-        * Create Flink {@link org.apache.flink.api.java.ExecutionEnvironment} 
depending
-        * on the options.
-        */
-       private ExecutionEnvironment 
createExecutionEnvironment(FlinkPipelineOptions options) {
-               String masterUrl = options.getFlinkMaster();
-
-
-               if (masterUrl.equals("[local]")) {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
-                       if (options.getParallelism() != -1) {
-                               env.setParallelism(options.getParallelism());
-                       }
-                       return env;
-               } else if (masterUrl.equals("[collection]")) {
-                       return new CollectionEnvironment();
-               } else if (masterUrl.equals("[auto]")) {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       if (options.getParallelism() != -1) {
-                               env.setParallelism(options.getParallelism());
-                       }
-                       return env;
-               } else if (masterUrl.matches(".*:\\d*")) {
-                       String[] parts = masterUrl.split(":");
-                       List<String> stagingFiles = options.getFilesToStage();
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(parts[0],
-                                       Integer.parseInt(parts[1]),
-                                       stagingFiles.toArray(new 
String[stagingFiles.size()]));
-                       if (options.getParallelism() != -1) {
-                               env.setParallelism(options.getParallelism());
-                       }
-                       return env;
-               } else {
-                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       if (options.getParallelism() != -1) {
-                               env.setParallelism(options.getParallelism());
-                       }
-                       return env;
-               }
+               this.flinkJobEnv = new FlinkJobExecutionEnvironment(options);
        }
 
        @Override
        public FlinkRunnerResult run(Pipeline pipeline) {
                LOG.info("Executing pipeline using FlinkPipelineRunner.");
-               
+
                LOG.info("Translating pipeline to Flink program.");
-               
-               translator.translate(pipeline);
-               
+
+               this.flinkJobEnv.translate(pipeline);
+
                LOG.info("Starting execution of Flink program.");
                
                JobExecutionResult result;
                try {
-                       result = flinkEnv.execute();
-               }
-               catch (Exception e) {
+                       result = this.flinkJobEnv.executeJob();
+               } catch (Exception e) {
                        LOG.error("Pipeline execution failed", e);
                        throw new RuntimeException("Pipeline execution failed", 
e);
                }
-               
+
                LOG.info("Execution finished in {} msecs", 
result.getNetRuntime());
-               
+
                Map<String, Object> accumulators = 
result.getAllAccumulatorResults();
                if (accumulators != null && !accumulators.isEmpty()) {
                        LOG.info("Final aggregator values:");
-                       
+
                        for (Map.Entry<String, Object> entry : 
result.getAllAccumulatorResults().entrySet()) {
                                LOG.info("{} : {}", entry.getKey(), 
entry.getValue());
                        }
@@ -230,16 +167,18 @@ public class FlinkPipelineRunner extends 
PipelineRunner<FlinkRunnerResult> {
        
/////////////////////////////////////////////////////////////////////////////
 
        @Override
-       public String toString() { return "DataflowPipelineRunner#" + 
hashCode(); }
+       public String toString() {
+               return "DataflowPipelineRunner#" + hashCode();
+       }
 
        /**
         * Attempts to detect all the resources the class loader has access to. 
This does not recurse
         * to class loader parents stopping it from pulling in resources from 
the system class loader.
         *
         * @param classLoader The URLClassLoader to use to detect resources to 
stage.
-        * @throws IllegalArgumentException  If either the class loader is not 
a URLClassLoader or one
-        * of the resources the class loader exposes is not a file resource.
         * @return A list of absolute paths to the resources the class loader 
uses.
+        * @throws IllegalArgumentException If either the class loader is not a 
URLClassLoader or one
+        *                                  of the resources the class loader 
exposes is not a file resource.
         */
        protected static List<String> 
detectClassPathResourcesToStage(ClassLoader classLoader) {
                if (!(classLoader instanceof URLClassLoader)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
index 82f1e46..7857778 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
@@ -43,7 +43,7 @@ public class WordCount {
                String getOutput();
                void setOutput(String value);
        }
-       
+
        public static void main(String[] args) {
 
                Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
new file mode 100644
index 0000000..0245a7b
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the 
command:
+ * <li>
+ *     <li>
+ *     <code>nc -lk 9999</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going 
to be
+ * the input to the program.
+ * */
+public class AutoComplete {
+
+  /**
+   * A PTransform that takes as input a list of tokens and returns
+   * the most common tokens per prefix.
+   */
+  public static class ComputeTopCompletions
+      extends PTransform<PCollection<String>, PCollection<KV<String, 
List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final boolean recursive;
+
+    protected ComputeTopCompletions(int candidatesPerPrefix, boolean 
recursive) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.recursive = recursive;
+    }
+
+    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean 
recursive) {
+      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+    }
+
+    @Override
+    public PCollection<KV<String, List<CompletionCandidate>>> 
apply(PCollection<String> input) {
+      PCollection<CompletionCandidate> candidates = input
+        // First count how often each token appears.
+        .apply(new Count.PerElement<String>())
+
+        // Map the KV outputs of Count into our own CompletionCandiate class.
+        .apply(ParDo.named("CreateCompletionCandidates").of(
+            new DoFn<KV<String, Long>, CompletionCandidate>() {
+              private static final long serialVersionUID = 0;
+
+              @Override
+              public void processElement(ProcessContext c) {
+                CompletionCandidate cand = new 
CompletionCandidate(c.element().getKey(), c.element().getValue());
+                c.output(cand);
+              }
+            }));
+
+      // Compute the top via either a flat or recursive algorithm.
+      if (recursive) {
+        return candidates
+          .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
+          .apply(Flatten.<KV<String, 
List<CompletionCandidate>>>pCollections());
+      } else {
+        return candidates
+          .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
+      }
+    }
+  }
+
+  /**
+   * Lower latency, but more expensive.
+   */
+  private static class ComputeTopFlat
+      extends PTransform<PCollection<CompletionCandidate>,
+                         PCollection<KV<String, List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    @Override
+    public PCollection<KV<String, List<CompletionCandidate>>> apply(
+        PCollection<CompletionCandidate> input) {
+      return input
+        // For each completion candidate, map it to all prefixes.
+        .apply(ParDo.of(new AllPrefixes(minPrefix)))
+
+        // Find and return the top candiates for each prefix.
+        .apply(Top.<String, 
CompletionCandidate>largestPerKey(candidatesPerPrefix)
+             .withHotKeyFanout(new HotKeyFanout()));
+    }
+
+    private static class HotKeyFanout implements SerializableFunction<String, 
Integer> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public Integer apply(String input) {
+        return (int) Math.pow(4, 5 - input.length());
+      }
+    }
+  }
+
+  /**
+   * Cheaper but higher latency.
+   *
+   * <p> Returns two PCollections, the first is top prefixes of size greater
+   * than minPrefix, and the second is top prefixes of size exactly
+   * minPrefix.
+   */
+  private static class ComputeTopRecursive
+      extends PTransform<PCollection<CompletionCandidate>,
+                         PCollectionList<KV<String, 
List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    private class KeySizePartitionFn implements PartitionFn<KV<String, 
List<CompletionCandidate>>> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public int partitionFor(KV<String, List<CompletionCandidate>> elem, int 
numPartitions) {
+        return elem.getKey().length() > minPrefix ? 0 : 1;
+      }
+    }
+
+    private static class FlattenTops
+        extends DoFn<KV<String, List<CompletionCandidate>>, 
CompletionCandidate> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public void processElement(ProcessContext c) {
+        for (CompletionCandidate cc : c.element().getValue()) {
+          c.output(cc);
+        }
+      }
+    }
+
+    @Override
+    public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+          PCollection<CompletionCandidate> input) {
+        if (minPrefix > 10) {
+          // Base case, partitioning to return the output in the expected 
format.
+          return input
+            .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
+            .apply(Partition.of(2, new KeySizePartitionFn()));
+        } else {
+          // If a candidate is in the top N for prefix a...b, it must also be 
in the top
+          // N for a...bX for every X, which is typlically a much smaller set 
to consider.
+          // First, compute the top candidate for prefixes of size at least 
minPrefix + 1.
+          PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
+            .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 
1));
+          // Consider the top candidates for each prefix of length minPrefix + 
1...
+          PCollection<KV<String, List<CompletionCandidate>>> small =
+            PCollectionList
+            .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
+            // ...together with those (previously excluded) candidates of 
length
+            // exactly minPrefix...
+            .and(input.apply(Filter.by(new 
SerializableFunction<CompletionCandidate, Boolean>() {
+                    private static final long serialVersionUID = 0;
+
+                    @Override
+                    public Boolean apply(CompletionCandidate c) {
+                      return c.getValue().length() == minPrefix;
+                    }
+                  })))
+            .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
+            // ...set the key to be the minPrefix-length prefix...
+            .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
+            // ...and (re)apply the Top operator to all of them together.
+            .apply(Top.<String, 
CompletionCandidate>largestPerKey(candidatesPerPrefix));
+
+          PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = 
larger
+              .apply("FlattenLarge", Flatten.<KV<String, 
List<CompletionCandidate>>>pCollections());
+
+          return PCollectionList.of(flattenLarger).and(small);
+        }
+    }
+  }
+
+  /**
+   * A DoFn that keys each candidate by all its prefixes.
+   */
+  private static class AllPrefixes
+      extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+    private static final long serialVersionUID = 0;
+
+    private final int minPrefix;
+    private final int maxPrefix;
+    public AllPrefixes(int minPrefix) {
+      this(minPrefix, Integer.MAX_VALUE);
+    }
+    public AllPrefixes(int minPrefix, int maxPrefix) {
+      this.minPrefix = minPrefix;
+      this.maxPrefix = maxPrefix;
+    }
+    @Override
+      public void processElement(ProcessContext c) {
+      String word = c.element().value;
+      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+        KV kv = KV.of(word.substring(0, i), c.element());
+        c.output(kv);
+      }
+    }
+  }
+
+  /**
+   * Class used to store tag-count pairs.
+   */
+  @DefaultCoder(AvroCoder.class)
+  static class CompletionCandidate implements Comparable<CompletionCandidate> {
+    private long count;
+    private String value;
+
+    public CompletionCandidate(String value, long count) {
+      this.value = value;
+      this.count = count;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    // Empty constructor required for Avro decoding.
+    @SuppressWarnings("unused")
+    public CompletionCandidate() {}
+
+    @Override
+    public int compareTo(CompletionCandidate o) {
+      if (this.count < o.count) {
+        return -1;
+      } else if (this.count == o.count) {
+        return this.value.compareTo(o.value);
+      } else {
+        return 1;
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof CompletionCandidate) {
+        CompletionCandidate that = (CompletionCandidate) other;
+        return this.count == that.count && this.value.equals(that.value);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.valueOf(count).hashCode() ^ value.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "CompletionCandidate[" + value + ", " + count + "]";
+    }
+  }
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+            createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  /**
+   * Takes as input a the top candidates per prefix, and emits an entity
+   * suitable for writing to Datastore.
+   */
+  static class FormatForPerTaskLocalFile extends DoFn<KV<String, 
List<CompletionCandidate>>, String> {
+    private static final long serialVersionUID = 0;
+
+    @Override
+    public void processElement(ProcessContext c) {
+      StringBuilder str = new StringBuilder();
+      KV<String, List<CompletionCandidate>> elem = c.element();
+
+      str.append(elem.getKey() +" @ "+ c.window() +" -> ");
+      for(CompletionCandidate cand: elem.getValue()) {
+        str.append(cand.toString() + " ");
+      }
+      System.out.println(str.toString());
+      c.output(str.toString());
+    }
+  }
+
+  /**
+   * Options supported by this class.
+   *
+   * <p> Inherits standard Dataflow configuration options.
+   */
+  private static interface Options extends 
WindowedWordCount.StreamingWordCountOptions {
+    @Description("Whether to use the recursive algorithm")
+    @Default.Boolean(true)
+    Boolean getRecursive();
+    void setRecursive(Boolean value);
+  }
+
+  public static void main(String[] args) throws IOException {
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    options.setStreaming(true);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    PTransform<? super PBegin, PCollection<String>> readSource =
+            Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 
3)).named("WordStream");
+    WindowFn<Object, ?> windowFn = 
FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+    // Create the pipeline.
+    Pipeline p = Pipeline.create(options);
+    PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
+      .apply(readSource)
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .apply(Window.<String>into(windowFn)
+              
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes())
+      .apply(ComputeTopCompletions.top(10, options.getRecursive()));
+
+    toWrite
+      .apply(ParDo.named("FormatForPerTaskFile").of(new 
FormatForPerTaskLocalFile()))
+      .apply(TextIO.Write.to("./outputAutoComplete.txt"));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
new file mode 100644
index 0000000..b0cc4fa
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.joda.time.Duration;
+
+/**
+ * To run the example, first open two sockets on two terminals by executing 
the commands:
+ * <li>
+ *     <li>
+ *         <code>nc -lk 9999</code>, and
+ *     </li>
+ *     <li>
+ *         <code>nc -lk 9998</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going 
to be
+ * the input to the program.
+ * */
+public class JoinExamples {
+
+       static PCollection<String> joinEvents(PCollection<String> streamA,
+                                                                               
  PCollection<String> streamB) throws Exception {
+
+               final TupleTag<String> firstInfoTag = new TupleTag<String>();
+               final TupleTag<String> secondInfoTag = new TupleTag<String>();
+
+               // transform both input collections to tuple collections, where 
the keys are country
+               // codes in both cases.
+               PCollection<KV<String, String>> firstInfo = streamA.apply(
+                               ParDo.of(new ExtractEventDataFn()));
+               PCollection<KV<String, String>> secondInfo = streamB.apply(
+                               ParDo.of(new ExtractEventDataFn()));
+
+               // country code 'key' -> CGBKR (<event info>, <country name>)
+               PCollection<KV<String, CoGbkResult>> kvpCollection = 
KeyedPCollectionTuple
+                               .of(firstInfoTag, firstInfo)
+                               .and(secondInfoTag, secondInfo)
+                               .apply(CoGroupByKey.<String>create());
+
+               // Process the CoGbkResult elements generated by the 
CoGroupByKey transform.
+               // country code 'key' -> string of <event info>, <country name>
+               PCollection<KV<String, String>> finalResultCollection =
+                               kvpCollection.apply(ParDo.named("Process").of(
+                                               new DoFn<KV<String, 
CoGbkResult>, KV<String, String>>() {
+                                                       private static final 
long serialVersionUID = 0;
+
+                                                       @Override
+                                                       public void 
processElement(ProcessContext c) {
+                                                               KV<String, 
CoGbkResult> e = c.element();
+                                                               String key = 
e.getKey();
+
+                                                               String defaultA 
= "NO_VALUE";
+
+                                                               // the 
following getOnly is a bit tricky because it expects to have
+                                                               // EXACTLY ONE 
value in the corresponding stream and for the corresponding key.
+
+                                                               String lineA = 
e.getValue().getOnly(firstInfoTag, defaultA);
+                                                               for (String 
lineB : c.element().getValue().getAll(secondInfoTag)) {
+                                                                       // 
Generate a string that combines information from both collection values
+                                                                       
c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB));
+                                                               }
+                                                       }
+                                               }));
+
+               return finalResultCollection
+                               .apply(ParDo.named("Format").of(new 
DoFn<KV<String, String>, String>() {
+                                       private static final long 
serialVersionUID = 0;
+
+                                       @Override
+                                       public void 
processElement(ProcessContext c) {
+                                               String result = 
c.element().getKey() + " -> " + c.element().getValue();
+                                               System.out.println(result);
+                                               c.output(result);
+                                       }
+                               }));
+       }
+
+       static class ExtractEventDataFn extends DoFn<String, KV<String, 
String>> {
+               private static final long serialVersionUID = 0;
+
+               @Override
+               public void processElement(ProcessContext c) {
+                       String line = c.element().toLowerCase();
+                       String key = line.split("\\s")[0];
+                       c.output(KV.of(key, line));
+               }
+       }
+
+       private static interface Options extends 
WindowedWordCount.StreamingWordCountOptions {
+
+       }
+
+       public static void main(String[] args) throws Exception {
+               Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+               // make it a streaming example.
+               options.setStreaming(true);
+               options.setRunner(FlinkPipelineRunner.class);
+
+               PTransform<? super PBegin, PCollection<String>> readSourceA =
+                               Read.from(new 
UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
+               PTransform<? super PBegin, PCollection<String>> readSourceB =
+                               Read.from(new 
UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
+
+               WindowFn<Object, ?> windowFn = 
FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+               Pipeline p = Pipeline.create(options);
+
+               // the following two 'applys' create multiple inputs to our 
pipeline, one for each
+               // of our two input sources.
+               PCollection<String> streamA = p.apply(readSourceA)
+                               .apply(Window.<String>into(windowFn)
+                                               
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+                                               .discardingFiredPanes());
+               PCollection<String> streamB = p.apply(readSourceB)
+                               .apply(Window.<String>into(windowFn)
+                                               
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+                                               .discardingFiredPanes());
+
+               PCollection<String> formattedResults = joinEvents(streamA, 
streamB);
+               formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
+               p.run();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
new file mode 100644
index 0000000..46c9bd6
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.joda.time.Duration;
+
+import java.util.Properties;
+
+public class KafkaWindowedWordCountExample {
+
+       static final String KAFKA_TOPIC = "test";  // Default kafka topic to 
read from
+       static final String KAFKA_BROKER = "localhost:9092";  // Default kafka 
broker to contact
+       static final String GROUP_ID = "myGroup";  // Default groupId
+       static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper 
to connect to for Kafka
+
+       public static class ExtractWordsFn extends DoFn<String, String> {
+               private final Aggregator<Long, Long> emptyLines =
+                               createAggregator("emptyLines", new 
Sum.SumLongFn());
+
+               @Override
+               public void processElement(ProcessContext c) {
+                       if (c.element().trim().isEmpty()) {
+                               emptyLines.addValue(1L);
+                       }
+
+                       // Split the line into words.
+                       String[] words = c.element().split("[^a-zA-Z']+");
+
+                       // Output each word encountered into the output 
PCollection.
+                       for (String word : words) {
+                               if (!word.isEmpty()) {
+                                       c.output(word);
+                               }
+                       }
+               }
+       }
+
+       public static class FormatAsStringFn extends DoFn<KV<String, Long>, 
String> {
+               @Override
+               public void processElement(ProcessContext c) {
+                       String row = c.element().getKey() + " - " + 
c.element().getValue() + " @ " + c.timestamp().toString();
+                       System.out.println(row);
+                       c.output(row);
+               }
+       }
+
+       public static interface KafkaStreamingWordCountOptions extends 
WindowedWordCount.StreamingWordCountOptions {
+               @Description("The Kafka topic to read from")
+               @Default.String(KAFKA_TOPIC)
+               String getKafkaTopic();
+
+               void setKafkaTopic(String value);
+
+               @Description("The Kafka Broker to read from")
+               @Default.String(KAFKA_BROKER)
+               String getBroker();
+
+               void setBroker(String value);
+
+               @Description("The Zookeeper server to connect to")
+               @Default.String(ZOOKEEPER)
+               String getZookeeper();
+
+               void setZookeeper(String value);
+
+               @Description("The groupId")
+               @Default.String(GROUP_ID)
+               String getGroup();
+
+               void setGroup(String value);
+
+       }
+
+       public static void main(String[] args) {
+               
PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
+               KafkaStreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
+               options.setJobName("KafkaExample");
+               options.setStreaming(true);
+               options.setRunner(FlinkPipelineRunner.class);
+
+               System.out.println(options.getKafkaTopic() +" "+ 
options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
+               Pipeline pipeline = Pipeline.create(options);
+
+               Properties p = new Properties();
+               p.setProperty("zookeeper.connect", options.getZookeeper());
+               p.setProperty("bootstrap.servers", options.getBroker());
+               p.setProperty("group.id", options.getGroup());
+
+               // this is the Flink consumer that reads the input to
+               // the program from a kafka topic.
+               FlinkKafkaConsumer082 kafkaConsumer = new 
FlinkKafkaConsumer082<>(
+                               options.getKafkaTopic(),
+                               new SimpleStringSchema(), p);
+
+               PCollection<String> words = pipeline
+                               .apply(Read.from(new 
UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, 
kafkaConsumer)).named("StreamingWordCount"))
+                               .apply(ParDo.of(new ExtractWordsFn()))
+                               
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
+                                               
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+                                               .discardingFiredPanes());
+
+               PCollection<KV<String, Long>> wordCounts =
+                               words.apply(Count.<String>perElement());
+
+               wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+                               .apply(TextIO.Write.to("./outputKafka.txt"));
+
+               pipeline.run();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
new file mode 100644
index 0000000..1d4a44b
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the 
command:
+ * <li>
+ *     <li>
+ *     <code>nc -lk 9999</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going 
to be
+ * the input to the program.
+ * */
+public class WindowedWordCount {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(WindowedWordCount.class);
+
+       static final long WINDOW_SIZE = 10;  // Default window duration in 
seconds
+       static final long SLIDE_SIZE = 5;  // Default window slide in seconds
+
+       static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+               @Override
+               public void processElement(ProcessContext c) {
+                       String row = c.element().getKey() + " - " + 
c.element().getValue() + " @ " + c.timestamp().toString();
+                       c.output(row);
+               }
+       }
+
+       static class ExtractWordsFn extends DoFn<String, String> {
+               private final Aggregator<Long, Long> emptyLines =
+                               createAggregator("emptyLines", new 
Sum.SumLongFn());
+
+               @Override
+               public void processElement(ProcessContext c) {
+                       if (c.element().trim().isEmpty()) {
+                               emptyLines.addValue(1L);
+                       }
+
+                       // Split the line into words.
+                       String[] words = c.element().split("[^a-zA-Z']+");
+
+                       // Output each word encountered into the output 
PCollection.
+                       for (String word : words) {
+                               if (!word.isEmpty()) {
+                                       c.output(word);
+                               }
+                       }
+               }
+       }
+
+       public static interface StreamingWordCountOptions extends 
com.dataartisans.flink.dataflow.examples.WordCount.Options {
+               @Description("Sliding window duration, in seconds")
+               @Default.Long(WINDOW_SIZE)
+               Long getWindowSize();
+
+               void setWindowSize(Long value);
+
+               @Description("Window slide, in seconds")
+               @Default.Long(SLIDE_SIZE)
+               Long getSlide();
+
+               void setSlide(Long value);
+       }
+
+       public static void main(String[] args) throws IOException {
+               StreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
+               options.setStreaming(true);
+               options.setWindowSize(10L);
+               options.setSlide(5L);
+               options.setRunner(FlinkPipelineRunner.class);
+
+               LOG.info("Windpwed WordCount with Sliding Windows of " + 
options.getWindowSize() +
+                               " sec. and a slide of " + options.getSlide());
+
+               Pipeline pipeline = Pipeline.create(options);
+
+               PCollection<String> words = pipeline
+                               .apply(Read.from(new 
UnboundedSocketSource<>("localhost", 9999, '\n', 
3)).named("StreamingWordCount"))
+                               .apply(ParDo.of(new ExtractWordsFn()))
+                               
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
+                                               
.every(Duration.standardSeconds(options.getSlide())))
+                                               
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+                                               .discardingFiredPanes());
+
+               PCollection<KV<String, Long>> wordCounts =
+                               words.apply(Count.<String>perElement());
+
+               wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+                               
.apply(TextIO.Write.to("./outputWordCount.txt"));
+
+               pipeline.run();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
new file mode 100644
index 0000000..8c0183e
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into 
Flink Jobs.
+ * This is based on {@link 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
+ */
+public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+
+       /**
+        * The necessary context in the case of a batch job.
+        */
+       private final FlinkBatchTranslationContext batchContext;
+
+       private int depth = 0;
+
+       /**
+        * Composite transform that we want to translate before proceeding with 
other transforms.
+        */
+       private PTransform<?, ?> currentCompositeTransform;
+
+       public FlinkBatchPipelineTranslator(ExecutionEnvironment env, 
PipelineOptions options) {
+               this.batchContext = new FlinkBatchTranslationContext(env, 
options);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Pipeline Visitor Methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void enterCompositeTransform(TransformTreeNode node) {
+               System.out.println(genSpaces(this.depth) + 
"enterCompositeTransform- " + formatNodeName(node));
+
+               PTransform<?, ?> transform = node.getTransform();
+               if (transform != null && currentCompositeTransform == null) {
+
+                       BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
+                       if (translator != null) {
+                               currentCompositeTransform = transform;
+                               if (transform instanceof CoGroupByKey && 
node.getInput().expand().size() != 2) {
+                                       // we can only optimize CoGroupByKey 
for input size 2
+                                       currentCompositeTransform = null;
+                               }
+                       }
+               }
+               this.depth++;
+       }
+
+       @Override
+       public void leaveCompositeTransform(TransformTreeNode node) {
+               PTransform<?, ?> transform = node.getTransform();
+               if (transform != null && currentCompositeTransform == 
transform) {
+
+                       BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
+                       if (translator != null) {
+                               System.out.println(genSpaces(this.depth) + 
"doingCompositeTransform- " + formatNodeName(node));
+                               applyBatchTransform(transform, node, 
translator);
+                               currentCompositeTransform = null;
+                       } else {
+                               throw new IllegalStateException("Attempted to 
translate composite transform " +
+                                               "but no translator was found: " 
+ currentCompositeTransform);
+                       }
+               }
+               this.depth--;
+               System.out.println(genSpaces(this.depth) + 
"leaveCompositeTransform- " + formatNodeName(node));
+       }
+
+       @Override
+       public void visitTransform(TransformTreeNode node) {
+               System.out.println(genSpaces(this.depth) + "visitTransform- " + 
formatNodeName(node));
+               if (currentCompositeTransform != null) {
+                       // ignore it
+                       return;
+               }
+
+               // get the transformation corresponding to hte node we are
+               // currently visiting and translate it into its Flink 
alternative.
+
+               PTransform<?, ?> transform = node.getTransform();
+               BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
+               if (translator == null) {
+                       System.out.println(node.getTransform().getClass());
+                       throw new UnsupportedOperationException("The transform 
" + transform + " is currently not supported.");
+               }
+               applyBatchTransform(transform, node, translator);
+       }
+
+       @Override
+       public void visitValue(PValue value, TransformTreeNode producer) {
+               // do nothing here
+       }
+
+       private <T extends PTransform<?, ?>> void 
applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, 
BatchTransformTranslator<?> translator) {
+               if (this.batchContext == null) {
+                       throw new IllegalStateException("The 
FlinkPipelineTranslator is not yet initialized.");
+               }
+
+               @SuppressWarnings("unchecked")
+               T typedTransform = (T) transform;
+
+               @SuppressWarnings("unchecked")
+               BatchTransformTranslator<T> typedTranslator = 
(BatchTransformTranslator<T>) translator;
+
+               // create the applied PTransform on the batchContext
+               batchContext.setCurrentTransform(AppliedPTransform.of(
+                               node.getFullName(), node.getInput(), 
node.getOutput(), (PTransform) transform));
+               typedTranslator.translateNode(typedTransform, batchContext);
+       }
+
+       /**
+        * A translator of a {@link PTransform}.
+        */
+       public interface BatchTransformTranslator<Type extends PTransform> {
+               void translateNode(Type transform, FlinkBatchTranslationContext 
context);
+       }
+
+       private static String genSpaces(int n) {
+               String s = "";
+               for (int i = 0; i < n; i++) {
+                       s += "|   ";
+               }
+               return s;
+       }
+
+       private static String formatNodeName(TransformTreeNode node) {
+               return node.toString().split("@")[1] + node.getTransform();
+       }
+}

Reply via email to