[FLINK-1560] [streaming] Streaming examples rework

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed7d1653
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed7d1653
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed7d1653

Branch: refs/heads/release-0.9.0-milestone-1
Commit: ed7d16534f55ca3ee43b2c5110c71bfa224e7144
Parents: d33b445
Author: szape <nemderogator...@gmail.com>
Authored: Wed Mar 25 09:26:01 2015 +0100
Committer: mbalassi <mbala...@apache.org>
Committed: Tue Apr 7 16:09:37 2015 +0200

----------------------------------------------------------------------
 .../examples/iteration/IterateExample.java      | 159 ++++++++++++++-----
 .../streaming/examples/join/WindowJoin.java     |  87 ++++++++--
 .../ml/IncrementalLearningSkeleton.java         | 113 ++++++++++---
 .../socket/SocketTextStreamWordCount.java       |  28 ++--
 .../examples/twitter/TwitterStream.java         |  35 ++--
 .../windowing/TopSpeedWindowingExample.java     |  60 +++++--
 .../examples/wordcount/PojoExample.java         |   3 +-
 .../test/wordcount/WordCountITCase.java         |   4 +-
 8 files changed, 371 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index bbd5433..d9a8167 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -17,29 +17,32 @@
 
 package org.apache.flink.streaming.examples.iteration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Example illustrating iterations in Flink streaming.
- * 
+ * <p/>
  * <p>
  * The program sums up random numbers and counts additions it performs to reach
  * a specific threshold in an iterative streaming fashion.
  * </p>
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to use:
  * <ul>
  * <li>streaming iterations,
@@ -59,35 +62,44 @@ public class IterateExample {
                        return;
                }
 
-               // set up input for the stream of (0,0) pairs
-               List<Tuple2<Double, Integer>> input = new 
ArrayList<Tuple2<Double, Integer>>();
-               for (int i = 0; i < 1000; i++) {
-                       input.add(new Tuple2<Double, Integer>(0., 0));
-               }
+               // set up input for the stream of integer pairs
 
-               // obtain execution environment and set setBufferTimeout(0) to 
enable
+               // obtain execution environment and set setBufferTimeout to 1 
to enable
                // continuous flushing of the output buffers (lowest latency)
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
                                .setBufferTimeout(1);
 
+               // create input stream of integer pairs
+               DataStream<Tuple2<Integer, Integer>> inputStream;
+               if(fileInput) {
+                       inputStream = env.readTextFile(inputPath).map(new 
FibonacciInputMap());
+               } else {
+                       inputStream = env.addSource(new 
RandomFibonacciSource());
+               }
+
                // create an iterative data stream from the input with 5 second 
timeout
-               IterativeDataStream<Tuple2<Double, Integer>> it = 
env.fromCollection(input).shuffle()
+               IterativeDataStream<Tuple5<Integer, Integer, Integer, Integer, 
Integer>> it = inputStream.map(new InputMap())
                                .iterate(5000);
 
-               // apply the step function to add new random value to the tuple 
and to
+               // apply the step function to get the next Fibonacci number
                // increment the counter and split the output with the output 
selector
-               SplitDataStream<Tuple2<Double, Integer>> step = it.map(new 
Step()).split(new MySelector());
+               SplitDataStream<Tuple5<Integer, Integer, Integer, Integer, 
Integer>> step = it.map(new Step())
+                               .split(new MySelector());
 
                // close the iteration by selecting the tuples that were 
directed to the
                // 'iterate' channel in the output selector
                it.closeWith(step.select("iterate"));
 
                // to produce the final output select the tuples directed to the
-               // 'output' channel then project it to the desired second field
-
-               DataStream<Tuple1<Integer>> numbers = 
step.select("output").project(1).types(Integer.class);
-
-               // emit result
+               // 'output' channel then get the input pairs that have the 
greatest iteration counter
+               // on a 1 second sliding window
+               DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = 
step.select("output")
+                               .map(new OutputMap())
+                               .window(Time.of(1L, TimeUnit.SECONDS))
+                               .every(Time.of(500L, TimeUnit.MILLISECONDS))
+                               .maxBy(1).flatten();
+
+               // emit results
                if (fileOutput) {
                        numbers.writeAsText(outputPath, 1);
                } else {
@@ -103,57 +115,124 @@ public class IterateExample {
        // 
*************************************************************************
 
        /**
-        * Iteration step function which takes an input (Double , Integer) and
-        * produces an output (Double + random, Integer + 1).
+        * Generate random integer pairs from the range from 0 to BOUND/2
+        */
+       private static class RandomFibonacciSource implements 
SourceFunction<Tuple2<Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               private Random rnd = new Random();
+
+               @Override
+               public void run(Collector<Tuple2<Integer, Integer>> collector) 
throws Exception {
+                       while(true) {
+                               int first = rnd.nextInt(BOUND/2 - 1) + 1;
+                               int second = rnd.nextInt(BOUND/2 - 1) + 1;
+
+                               collector.collect(new Tuple2<Integer, 
Integer>(first, second));
+                               Thread.sleep(100L);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       // no cleanup needed
+               }
+       }
+
+       /**
+        * Generate random integer pairs from the range from 0 to BOUND/2
         */
-       public static class Step extends
-                       RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, 
Integer>> {
+       private static class FibonacciInputMap implements MapFunction<String, 
Tuple2<Integer, Integer>> {
                private static final long serialVersionUID = 1L;
-               private transient Random rnd;
 
-               public void open(Configuration parameters) {
-                       rnd = new Random();
+               @Override
+               public Tuple2<Integer, Integer> map(String value) throws 
Exception {
+                       Thread.sleep(100L);
+                       String record = value.substring(1, value.length()-1);
+                       String[] splitted = record.split(",");
+                       return new Tuple2<Integer, 
Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
                }
+       }
+
+       /**
+        * Map the inputs so that the next Fibonacci numbers can be calculated 
while preserving the original input tuple
+        * A counter is attached to the tuple and incremented in every 
iteration step
+        */
+       public static class InputMap implements MapFunction<Tuple2<Integer, 
Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+
+               @Override
+               public Tuple5<Integer, Integer, Integer, Integer, Integer> 
map(Tuple2<Integer, Integer> value) throws
+                               Exception {
+                       return new Tuple5<Integer, Integer, Integer, Integer, 
Integer>(value.f0, value.f1, value.f0, value.f1, 0);
+               }
+       }
+
+       /**
+        * Iteration step function that calculates the next Fibonacci number
+        */
+       public static class Step implements
+                       MapFunction<Tuple5<Integer, Integer, Integer, Integer, 
Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
 
                @Override
-               public Tuple2<Double, Integer> map(Tuple2<Double, Integer> 
value) throws Exception {
-                       return new Tuple2<Double, Integer>(value.f0 + 
rnd.nextDouble(), value.f1 + 1);
+               public Tuple5<Integer, Integer, Integer, Integer, Integer> 
map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception 
{
+                       return new Tuple5<Integer, Integer, Integer, Integer, 
Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
                }
        }
 
        /**
         * OutputSelector testing which tuple needs to be iterated again.
         */
-       public static class MySelector implements OutputSelector<Tuple2<Double, 
Integer>> {
+       public static class MySelector implements 
OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public Iterable<String> select(Tuple2<Double, Integer> value) {
+               public Iterable<String> select(Tuple5<Integer, Integer, 
Integer, Integer, Integer> value) {
                        List<String> output = new ArrayList<String>();
-                       if (value.f0 > 100) {
-                               output.add("output");
-                       } else {
+                       if (value.f2 < BOUND && value.f3 < BOUND) {
                                output.add("iterate");
+                       } else {
+                               output.add("output");
                        }
+                       output.add("output");
                        return output;
                }
+       }
+
+       /**
+        * Giving back the input pair and the counter
+        */
+       public static class OutputMap implements MapFunction<Tuple5<Integer, 
Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> 
{
 
+               @Override
+               public Tuple2<Tuple2<Integer, Integer>, Integer> 
map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws
+                               Exception {
+                       return new Tuple2<Tuple2<Integer, Integer>, 
Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4);
+               }
        }
 
        // 
*************************************************************************
        // UTIL METHODS
        // 
*************************************************************************
 
+       private static boolean fileInput = false;
        private static boolean fileOutput = false;
+       private static String inputPath;
        private static String outputPath;
+       private static final int BOUND = 100;
 
        private static boolean parseParameters(String[] args) {
 
                if (args.length > 0) {
                        // parse input arguments
-                       fileOutput = true;
                        if (args.length == 1) {
+                               fileOutput = true;
                                outputPath = args[0];
+                       } else if(args.length == 2) {
+                               fileInput = true;
+                               inputPath = args[0];
+                               fileOutput = true;
+                               outputPath = args[1];
                        } else {
                                System.err.println("Usage: IterateExample 
<result path>");
                                return false;
@@ -165,4 +244,4 @@ public class IterateExample {
                }
                return true;
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index a5a9577..bfc59ec 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -17,10 +17,8 @@
 
 package org.apache.flink.streaming.examples.join;
 
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -28,17 +26,20 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.util.Collector;
 
+import java.util.Random;
+
 /**
  * Example illustrating join over sliding windows of streams in Flink.
- * 
+ * <p/>
  * <p>
  * his example will join two streams with a sliding window. One which emits
  * grades and one which emits salaries of people.
  * </p>
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to:
  * <ul>
  * <li>do windowed joins,
@@ -51,6 +52,9 @@ public class WindowJoin {
        // PROGRAM
        // 
*************************************************************************
 
+       private static DataStream<Tuple2<String, Integer>> grades;
+       private static DataStream<Tuple2<String, Integer>> salaries;
+
        public static void main(String[] args) throws Exception {
 
                if (!parseParameters(args)) {
@@ -61,18 +65,17 @@ public class WindowJoin {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                // connect to the data sources for grades and salaries
-               DataStream<Tuple2<String, Integer>> grades = env.addSource(new 
GradeSource());
-               DataStream<Tuple2<String, Integer>> salaries = 
env.addSource(new SalarySource());
+               setInputStreams(env);
 
                // apply a temporal join over the two stream based on the names 
over one
                // second windows
                DataStream<Tuple3<String, Integer, Integer>> joinedStream = 
grades
-                                               .join(salaries)
-                                               .onWindow(1, TimeUnit.SECONDS)
-                                               .where(0)
-                                               .equalTo(0)
-                                               .with(new MyJoinFunction());
-               
+                               .join(salaries)
+                               .onWindow(1, new MyTimestamp(0), new 
MyTimestamp(0))
+                               .where(0)
+                               .equalTo(0)
+                               .with(new MyJoinFunction());
+
                // emit result
                if (fileOutput) {
                        joinedStream.writeAsText(outputPath, 1);
@@ -88,7 +91,7 @@ public class WindowJoin {
        // USER FUNCTIONS
        // 
*************************************************************************
 
-       private final static String[] names = { "tom", "jerry", "alice", "bob", 
"john", "grace" };
+       private final static String[] names = {"tom", "jerry", "alice", "bob", 
"john", "grace"};
        private final static int GRADE_COUNT = 5;
        private final static int SALARY_MAX = 10000;
        private final static int SLEEP_TIME = 10;
@@ -154,6 +157,21 @@ public class WindowJoin {
                }
        }
 
+       public static class MySourceMap extends RichMapFunction<String, 
Tuple2<String, Integer>> {
+
+               private String[] record;
+
+               public MySourceMap() {
+                       record = new String[2];
+               }
+
+               @Override
+               public Tuple2<String, Integer> map(String line) throws 
Exception {
+                       record = line.substring(1, line.length() - 
1).split(",");
+                       return new Tuple2<String, Integer>(record[0], 
Integer.parseInt(record[1]));
+               }
+       }
+
        public static class MyJoinFunction
                        implements
                        JoinFunction<Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Integer, Integer>> {
@@ -172,22 +190,47 @@ public class WindowJoin {
                }
        }
 
+       public static class MyTimestamp implements Timestamp<Tuple2<String, 
Integer>> {
+               private int counter;
+
+               public MyTimestamp(int starttime) {
+                       this.counter = starttime;
+               }
+
+               @Override
+               public long getTimestamp(Tuple2<String, Integer> value) {
+                       counter += SLEEP_TIME;
+                       return counter;
+               }
+       }
+
        // 
*************************************************************************
        // UTIL METHODS
        // 
*************************************************************************
 
+       private static boolean fileInput = false;
        private static boolean fileOutput = false;
+
+       private static String gradesPath;
+       private static String salariesPath;
        private static String outputPath;
 
        private static boolean parseParameters(String[] args) {
 
                if (args.length > 0) {
                        // parse input arguments
-                       fileOutput = true;
                        if (args.length == 1) {
+                               fileOutput = true;
                                outputPath = args[0];
+                       } else if (args.length == 3) {
+                               fileInput = true;
+                               fileOutput = true;
+                               gradesPath = args[0];
+                               salariesPath = args[1];
+                               outputPath = args[2];
                        } else {
-                               System.err.println("Usage: WindowJoin <result 
path>");
+                               System.err.println("Usage: WindowJoin <result 
path> or WindowJoin <input path 1> <input path 1> " +
+                                               "<result path>");
                                return false;
                        }
                } else {
@@ -197,4 +240,14 @@ public class WindowJoin {
                }
                return true;
        }
+
+       private static void setInputStreams(StreamExecutionEnvironment env) {
+               if (fileInput) {
+                       grades = env.readTextFile(gradesPath).map(new 
MySourceMap());
+                       salaries = env.readTextFile(salariesPath).map(new 
MySourceMap());
+               } else {
+                       grades = env.addSource(new GradeSource());
+                       salaries = env.addSource(new SalarySource());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 26895f2..68b105a 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -17,28 +17,27 @@
 
 package org.apache.flink.streaming.examples.ml;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.util.Collector;
 
 /**
  * Skeleton for incremental machine learning algorithm consisting of a
  * pre-computed model, which gets updated for the new inputs and new input data
  * for which the job provides predictions.
- * 
+ * <p/>
  * <p>
  * This may serve as a base of a number of algorithms, e.g. updating an
  * incremental Alternating Least Squares model while also providing the
  * predictions.
  * </p>
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to use:
  * <ul>
  * <li>Connected streams
@@ -48,6 +47,9 @@ import org.apache.flink.util.Collector;
  */
 public class IncrementalLearningSkeleton {
 
+       private static DataStream<Integer> trainingData = null;
+       private static DataStream<Integer> newData = null;
+
        // 
*************************************************************************
        // PROGRAM
        // 
*************************************************************************
@@ -59,15 +61,15 @@ public class IncrementalLearningSkeleton {
                }
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               // env.setDegreeOfParallelism(1);
+               createSourceStreams(env);
 
                // build new model on every second of new data
-               DataStream<Double[]> model = env.addSource(new 
TrainingDataSource())
-                               .window(Time.of(5000, TimeUnit.MILLISECONDS))
+               DataStream<Double[]> model = trainingData.window(Time.of(5000, 
new LinearTimestamp()))
                                .mapWindow(new PartialModelBuilder()).flatten();
 
-               // use partial model for prediction
-               DataStream<Integer> prediction = env.addSource(new 
NewDataSource()).connect(model)
-                               .map(new Predictor());
+               // use partial model for newData
+               DataStream<Integer> prediction = newData.connect(model).map(new 
Predictor());
 
                // emit result
                if (fileOutput) {
@@ -85,7 +87,7 @@ public class IncrementalLearningSkeleton {
        // 
*************************************************************************
 
        /**
-        * Feeds new data for prediction. By default it is implemented as 
constantly
+        * Feeds new data for newData. By default it is implemented as 
constantly
         * emitting the Integer 1 in a loop.
         */
        public static class NewDataSource implements SourceFunction<Integer> {
@@ -111,6 +113,34 @@ public class IncrementalLearningSkeleton {
        }
 
        /**
+        * Feeds new data for newData. By default it is implemented as 
constantly
+        * emitting the Integer 1 in a loop.
+        */
+       public static class FiniteNewDataSource implements 
SourceFunction<Integer> {
+               private static final long serialVersionUID = 1L;
+               private int counter;
+
+               @Override
+               public void run(Collector<Integer> collector) throws Exception {
+                       Thread.sleep(15);
+                       while (counter < 50) {
+                               collector.collect(getNewData());
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       // No cleanup needed
+               }
+
+               private Integer getNewData() throws InterruptedException {
+                       Thread.sleep(5);
+                       counter++;
+                       return 1;
+               }
+       }
+
+       /**
         * Feeds new training data for the partial model builder. By default it 
is
         * implemented as constantly emitting the Integer 1 in a loop.
         */
@@ -139,13 +169,50 @@ public class IncrementalLearningSkeleton {
        }
 
        /**
+        * Feeds new training data for the partial model builder. By default it 
is
+        * implemented as constantly emitting the Integer 1 in a loop.
+        */
+       public static class FiniteTrainingDataSource implements 
SourceFunction<Integer> {
+               private static final long serialVersionUID = 1L;
+               private int counter = 0;
+
+               @Override
+               public void run(Collector<Integer> collector) throws Exception {
+                       while (counter < 8200) {
+                               collector.collect(getTrainingData());
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       // No cleanup needed
+               }
+
+               private Integer getTrainingData() throws InterruptedException {
+                       counter++;
+                       return 1;
+               }
+       }
+
+       public static class LinearTimestamp implements Timestamp<Integer> {
+               private static final long serialVersionUID = 1L;
+
+               private long counter = 0L;
+
+               @Override
+               public long getTimestamp(Integer value) {
+                       return counter += 10L;
+               }
+       }
+
+       /**
         * Builds up-to-date partial models on new training data.
         */
        public static class PartialModelBuilder implements 
WindowMapFunction<Integer, Double[]> {
                private static final long serialVersionUID = 1L;
 
                protected Double[] buildPartialModel(Iterable<Integer> values) {
-                       return new Double[] { 1. };
+                       return new Double[]{1.};
                }
 
                @Override
@@ -155,11 +222,11 @@ public class IncrementalLearningSkeleton {
        }
 
        /**
-        * Creates prediction using the model produced in batch-processing and 
the
+        * Creates newData using the model produced in batch-processing and the
         * up-to-date partial model.
-        * 
+        * <p/>
         * <p>
-        * By defaults emits the Integer 0 for every prediction and the Integer 
1
+        * By defaults emits the Integer 0 for every newData and the Integer 1
         * for every model update.
         * </p>
         */
@@ -171,7 +238,7 @@ public class IncrementalLearningSkeleton {
 
                @Override
                public Integer map1(Integer value) {
-                       // Return prediction
+                       // Return newData
                        return predict(value);
                }
 
@@ -185,10 +252,10 @@ public class IncrementalLearningSkeleton {
 
                // pulls model built with batch-job on the old training data
                protected Double[] getBatchModel() {
-                       return new Double[] { 0. };
+                       return new Double[]{0.};
                }
 
-               // performs prediction using the two models
+               // performs newData using the two models
                protected Integer predict(Integer inTuple) {
                        return 0;
                }
@@ -220,4 +287,14 @@ public class IncrementalLearningSkeleton {
                }
                return true;
        }
+
+       public static void createSourceStreams(StreamExecutionEnvironment env) {
+               if (fileOutput) {
+                       trainingData = env.addSource(new 
FiniteTrainingDataSource());
+                       newData = env.addSource(new FiniteNewDataSource());
+               } else {
+                       trainingData = env.addSource(new TrainingDataSource());
+                       newData = env.addSource(new NewDataSource());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
index e9b60f4..1473097 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -26,26 +26,26 @@ import 
org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
  * This example shows an implementation of WordCount with data from a text
  * socket. To run the example make sure that the service providing the text 
data
  * is already up and running.
- * 
- * <p>
+ * <p/>
+ * <p/>
  * To start an example socket text stream on your local machine run netcat from
  * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
  * port number.
- * 
- * 
- * <p>
+ * <p/>
+ * <p/>
+ * <p/>
  * Usage:
  * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result 
path&gt;</code>
  * <br>
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to:
  * <ul>
  * <li>use StreamExecutionEnvironment.socketTextStream
  * <li>write a simple Flink program,
  * <li>write and use user-defined functions.
  * </ul>
- * 
+ *
  * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
  */
 public class SocketTextStreamWordCount {
@@ -60,14 +60,14 @@ public class SocketTextStreamWordCount {
                                .getExecutionEnvironment();
 
                // get input data
-               DataStream<String> text = env.socketTextStream(hostName, port);
+               DataStream<String> text = env.socketTextStream(hostName, port, 
'\n', 0);
 
                DataStream<Tuple2<String, Integer>> counts =
-               // split up the lines in pairs (2-tuples) containing: (word,1)
-               text.flatMap(new Tokenizer())
-               // group by the tuple field "0" and sum up tuple field "1"
-                               .groupBy(0)
-                               .sum(1);
+                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
+                               text.flatMap(new Tokenizer())
+                                               // group by the tuple field "0" 
and sum up tuple field "1"
+                                               .groupBy(0)
+                                               .sum(1);
 
                if (fileOutput) {
                        counts.writeAsText(outputPath, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 1901475..272381f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.examples.twitter;
 
-import java.util.StringTokenizer;
-
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,26 +27,27 @@ import 
org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;
 
+import java.util.StringTokenizer;
+
 /**
  * Implements the "TwitterStream" program that computes a most used word
  * occurrence over JSON files in a streaming fashion.
- * 
- * <p>
+ * <p/>
+ * <p/>
  * The input is a JSON text file with lines separated by newline characters.
- * 
- * <p>
+ * <p/>
+ * <p/>
  * Usage: <code>TwitterStream &lt;text path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from
  * {@link TwitterStreamData}.
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to:
  * <ul>
  * <li>acquire external data,
  * <li>use in-line defined functions,
  * <li>handle flattened stream inputs.
  * </ul>
- * 
  */
 public class TwitterStream {
 
@@ -70,9 +69,9 @@ public class TwitterStream {
                DataStream<String> streamSource = getTextDataStream(env);
 
                DataStream<Tuple2<String, Integer>> tweets = streamSource
-               // selecting English tweets and splitting to words
+                               // selecting English tweets and splitting to 
words
                                .flatMap(new SelectEnglishAndTokenizeFlatMap())
-                               // returning (word, 1)
+                                               // returning (word, 1)
                                .map(new MapFunction<String, Tuple2<String, 
Integer>>() {
                                        private static final long 
serialVersionUID = 1L;
 
@@ -81,14 +80,14 @@ public class TwitterStream {
                                                return new Tuple2<String, 
Integer>(value, 1);
                                        }
                                })
-                               // group by words and sum their occurence
+                                               // group by words and sum their 
occurence
                                .groupBy(0).sum(1)
-                               // select word with maximum occurence
+                                               // select word with maximum 
occurence
                                .flatMap(new SelectMaxOccurence());
 
                // emit result
                if (fileOutput) {
-                       tweets.writeAsText(outputPath, 1);
+                       tweets.writeAsText(outputPath, 1L);
                } else {
                        tweets.print();
                }
@@ -103,7 +102,7 @@ public class TwitterStream {
 
        /**
         * Makes sentences from English tweets.
-        * 
+        * <p/>
         * <p>
         * Implements a string tokenizer that splits sentences into words as a
         * user-defined FlatMapFunction. The function takes a line (String) and
@@ -167,6 +166,7 @@ public class TwitterStream {
        // UTIL METHODS
        // 
*************************************************************************
 
+       private static boolean fileInput = false;
        private static boolean fileOutput = false;
        private static String textPath;
        private static String outputPath;
@@ -176,8 +176,11 @@ public class TwitterStream {
                        // parse input arguments
                        fileOutput = true;
                        if (args.length == 2) {
+                               fileInput = true;
                                textPath = args[0];
                                outputPath = args[1];
+                       } else if (args.length == 1) {
+                               outputPath = args[0];
                        } else {
                                System.err.println("USAGE:\nTwitterStream 
<pathToPropertiesFile> <result path>");
                                return false;
@@ -191,7 +194,7 @@ public class TwitterStream {
        }
 
        private static DataStream<String> 
getTextDataStream(StreamExecutionEnvironment env) {
-               if (fileOutput) {
+               if (fileInput) {
                        // read the text file from given input path
                        return env.readTextFile(textPath);
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index 311c6b2..bf3802b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -24,11 +25,11 @@ import 
org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.util.Collector;
 
 import java.util.Arrays;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 /**
  * An example of grouped stream windowing where different eviction and trigger
@@ -45,13 +46,17 @@ public class TopSpeedWindowingExample {
                        return;
                }
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-               @SuppressWarnings({ "rawtypes", "serial" })
-               DataStream topSpeeds = env
-                               .addSource(CarSource.create(numOfCars))
-                               .groupBy(0)
-                               .window(Time.of(evictionSec, TimeUnit.SECONDS))
+               @SuppressWarnings({"rawtypes", "serial"})
+               DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
+               if (fileInput) {
+                       carData = env.readTextFile(inputPath).map(new 
ParseCarData());
+               } else {
+                       carData = env.addSource(CarSource.create(numOfCars));
+               }
+               DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = 
carData.groupBy(0)
+                               .window(Time.of(evictionSec, new 
CarTimestamp()))
                                .every(Delta.of(triggerMeters,
                                                new 
DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
                                                        @Override
@@ -61,8 +66,12 @@ public class TopSpeedWindowingExample {
                                                                return 
newDataPoint.f2 - oldDataPoint.f2;
                                                        }
                                                }, new Tuple4<Integer, Integer, 
Double, Long>(0, 0, 0d, 0l))).local().maxBy(1).flatten();
+               if (fileOutput) {
+                       topSpeeds.writeAsText(outputPath);
+               } else {
+                       topSpeeds.print();
+               }
 
-               topSpeeds.print();
                env.execute("CarTopSpeedWindowingExample");
        }
 
@@ -99,8 +108,9 @@ public class TopSpeedWindowingExample {
                                                speeds[carId] = Math.max(0, 
speeds[carId] - 5);
                                        }
                                        distances[carId] += speeds[carId] / 
3.6d;
-                                       collector.collect(new Tuple4<Integer, 
Integer, Double, Long>(carId,
-                                                       speeds[carId], 
distances[carId], System.currentTimeMillis()));
+                                       Tuple4<Integer, Integer, Double, Long> 
record = new Tuple4<Integer, Integer, Double, Long>(carId,
+                                                       speeds[carId], 
distances[carId], System.currentTimeMillis());
+                                       collector.collect(record);
                                }
                        }
                }
@@ -110,9 +120,34 @@ public class TopSpeedWindowingExample {
                }
        }
 
+       private static class ParseCarData extends
+                       RichMapFunction<String, Tuple4<Integer, Integer, 
Double, Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple4<Integer, Integer, Double, Long> map(String 
record) {
+                       String rawData = record.substring(1, record.length() - 
1);
+                       String[] data = rawData.split(",");
+                       return new Tuple4<Integer, Integer, Double, 
Long>(Integer.valueOf(data[0]),
+                                       Integer.valueOf(data[1]), 
Double.valueOf(data[2]), Long.valueOf(data[3]));
+               }
+       }
+
+       private static class CarTimestamp implements Timestamp<Tuple4<Integer, 
Integer, Double, Long>> {
+
+               @Override
+               public long getTimestamp(Tuple4<Integer, Integer, Double, Long> 
value) {
+                       return value.f3;
+               }
+       }
+
+       private static boolean fileInput = false;
+       private static boolean fileOutput = false;
        private static int numOfCars = 2;
        private static int evictionSec = 10;
        private static double triggerMeters = 50;
+       private static String inputPath;
+       private static String outputPath;
 
        private static boolean parseParameters(String[] args) {
 
@@ -121,6 +156,11 @@ public class TopSpeedWindowingExample {
                                numOfCars = Integer.valueOf(args[0]);
                                evictionSec = Integer.valueOf(args[1]);
                                triggerMeters = Double.valueOf(args[2]);
+                       } else if (args.length == 2) {
+                               fileInput = true;
+                               fileOutput = true;
+                               inputPath = args[0];
+                               outputPath = args[1];
                        } else {
                                System.err
                                                .println("Usage: 
TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>");

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
index d582c10..5ff3fc1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.examples.wordcount;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
index 51da0d6..85e1dca 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.examples.test.wordcount;
 
 import org.apache.flink.streaming.examples.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
 
 public class WordCountITCase extends StreamingProgramTestBase {
 
@@ -40,6 +40,6 @@ public class WordCountITCase extends StreamingProgramTestBase 
{
 
        @Override
        protected void testProgram() throws Exception {
-               WordCount.main(new String[] { textPath, resultPath });
+               WordCount.main(new String[]{textPath, resultPath});
        }
 }

Reply via email to