[FLINK-1560] [streaming] Streaming examples rework
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed7d1653 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed7d1653 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed7d1653 Branch: refs/heads/release-0.9.0-milestone-1 Commit: ed7d16534f55ca3ee43b2c5110c71bfa224e7144 Parents: d33b445 Author: szape <nemderogator...@gmail.com> Authored: Wed Mar 25 09:26:01 2015 +0100 Committer: mbalassi <mbala...@apache.org> Committed: Tue Apr 7 16:09:37 2015 +0200 ---------------------------------------------------------------------- .../examples/iteration/IterateExample.java | 159 ++++++++++++++----- .../streaming/examples/join/WindowJoin.java | 87 ++++++++-- .../ml/IncrementalLearningSkeleton.java | 113 ++++++++++--- .../socket/SocketTextStreamWordCount.java | 28 ++-- .../examples/twitter/TwitterStream.java | 35 ++-- .../windowing/TopSpeedWindowingExample.java | 60 +++++-- .../examples/wordcount/PojoExample.java | 3 +- .../test/wordcount/WordCountITCase.java | 4 +- 8 files changed, 371 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index bbd5433..d9a8167 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -17,29 +17,32 @@ package org.apache.flink.streaming.examples.iteration; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; /** * Example illustrating iterations in Flink streaming. - * + * <p/> * <p> * The program sums up random numbers and counts additions it performs to reach * a specific threshold in an iterative streaming fashion. * </p> - * - * <p> + * <p/> + * <p/> * This example shows how to use: * <ul> * <li>streaming iterations, @@ -59,35 +62,44 @@ public class IterateExample { return; } - // set up input for the stream of (0,0) pairs - List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>(); - for (int i = 0; i < 1000; i++) { - input.add(new Tuple2<Double, Integer>(0., 0)); - } + // set up input for the stream of integer pairs - // obtain execution environment and set setBufferTimeout(0) to enable + // obtain execution environment and set setBufferTimeout to 1 to enable // continuous flushing of the output buffers (lowest latency) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() .setBufferTimeout(1); + // create input stream of integer pairs + DataStream<Tuple2<Integer, Integer>> inputStream; + if(fileInput) { + inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap()); + } else { + inputStream = env.addSource(new RandomFibonacciSource()); + } + // create an iterative data stream from the input with 5 second timeout - IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle() + IterativeDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap()) .iterate(5000); - // apply the step function to add new random value to the tuple and to + // apply the step function to get the next Fibonacci number // increment the counter and split the output with the output selector - SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).split(new MySelector()); + SplitDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step()) + .split(new MySelector()); // close the iteration by selecting the tuples that were directed to the // 'iterate' channel in the output selector it.closeWith(step.select("iterate")); // to produce the final output select the tuples directed to the - // 'output' channel then project it to the desired second field - - DataStream<Tuple1<Integer>> numbers = step.select("output").project(1).types(Integer.class); - - // emit result + // 'output' channel then get the input pairs that have the greatest iteration counter + // on a 1 second sliding window + DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output") + .map(new OutputMap()) + .window(Time.of(1L, TimeUnit.SECONDS)) + .every(Time.of(500L, TimeUnit.MILLISECONDS)) + .maxBy(1).flatten(); + + // emit results if (fileOutput) { numbers.writeAsText(outputPath, 1); } else { @@ -103,57 +115,124 @@ public class IterateExample { // ************************************************************************* /** - * Iteration step function which takes an input (Double , Integer) and - * produces an output (Double + random, Integer + 1). + * Generate random integer pairs from the range from 0 to BOUND/2 + */ + private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> { + private static final long serialVersionUID = 1L; + + private Random rnd = new Random(); + + @Override + public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception { + while(true) { + int first = rnd.nextInt(BOUND/2 - 1) + 1; + int second = rnd.nextInt(BOUND/2 - 1) + 1; + + collector.collect(new Tuple2<Integer, Integer>(first, second)); + Thread.sleep(100L); + } + } + + @Override + public void cancel() { + // no cleanup needed + } + } + + /** + * Generate random integer pairs from the range from 0 to BOUND/2 */ - public static class Step extends - RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> { + private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; - private transient Random rnd; - public void open(Configuration parameters) { - rnd = new Random(); + @Override + public Tuple2<Integer, Integer> map(String value) throws Exception { + Thread.sleep(100L); + String record = value.substring(1, value.length()-1); + String[] splitted = record.split(","); + return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); } + } + + /** + * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple + * A counter is attached to the tuple and incremented in every iteration step + */ + public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> { + + @Override + public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws + Exception { + return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f0, value.f1, 0); + } + } + + /** + * Iteration step function that calculates the next Fibonacci number + */ + public static class Step implements + MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> { + private static final long serialVersionUID = 1L; @Override - public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception { - return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1); + public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception { + return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4); } } /** * OutputSelector testing which tuple needs to be iterated again. */ - public static class MySelector implements OutputSelector<Tuple2<Double, Integer>> { + public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> { private static final long serialVersionUID = 1L; @Override - public Iterable<String> select(Tuple2<Double, Integer> value) { + public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) { List<String> output = new ArrayList<String>(); - if (value.f0 > 100) { - output.add("output"); - } else { + if (value.f2 < BOUND && value.f3 < BOUND) { output.add("iterate"); + } else { + output.add("output"); } + output.add("output"); return output; } + } + + /** + * Giving back the input pair and the counter + */ + public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> { + @Override + public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws + Exception { + return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4); + } } // ************************************************************************* // UTIL METHODS // ************************************************************************* + private static boolean fileInput = false; private static boolean fileOutput = false; + private static String inputPath; private static String outputPath; + private static final int BOUND = 100; private static boolean parseParameters(String[] args) { if (args.length > 0) { // parse input arguments - fileOutput = true; if (args.length == 1) { + fileOutput = true; outputPath = args[0]; + } else if(args.length == 2) { + fileInput = true; + inputPath = args[0]; + fileOutput = true; + outputPath = args[1]; } else { System.err.println("Usage: IterateExample <result path>"); return false; @@ -165,4 +244,4 @@ public class IterateExample { } return true; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index a5a9577..bfc59ec 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -17,10 +17,8 @@ package org.apache.flink.streaming.examples.join; -import java.util.Random; -import java.util.concurrent.TimeUnit; - import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; @@ -28,17 +26,20 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.source.RichSourceFunction; import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.util.Collector; +import java.util.Random; + /** * Example illustrating join over sliding windows of streams in Flink. - * + * <p/> * <p> * his example will join two streams with a sliding window. One which emits * grades and one which emits salaries of people. * </p> - * - * <p> + * <p/> + * <p/> * This example shows how to: * <ul> * <li>do windowed joins, @@ -51,6 +52,9 @@ public class WindowJoin { // PROGRAM // ************************************************************************* + private static DataStream<Tuple2<String, Integer>> grades; + private static DataStream<Tuple2<String, Integer>> salaries; + public static void main(String[] args) throws Exception { if (!parseParameters(args)) { @@ -61,18 +65,17 @@ public class WindowJoin { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // connect to the data sources for grades and salaries - DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource()); - DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource()); + setInputStreams(env); // apply a temporal join over the two stream based on the names over one // second windows DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades - .join(salaries) - .onWindow(1, TimeUnit.SECONDS) - .where(0) - .equalTo(0) - .with(new MyJoinFunction()); - + .join(salaries) + .onWindow(1, new MyTimestamp(0), new MyTimestamp(0)) + .where(0) + .equalTo(0) + .with(new MyJoinFunction()); + // emit result if (fileOutput) { joinedStream.writeAsText(outputPath, 1); @@ -88,7 +91,7 @@ public class WindowJoin { // USER FUNCTIONS // ************************************************************************* - private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" }; + private final static String[] names = {"tom", "jerry", "alice", "bob", "john", "grace"}; private final static int GRADE_COUNT = 5; private final static int SALARY_MAX = 10000; private final static int SLEEP_TIME = 10; @@ -154,6 +157,21 @@ public class WindowJoin { } } + public static class MySourceMap extends RichMapFunction<String, Tuple2<String, Integer>> { + + private String[] record; + + public MySourceMap() { + record = new String[2]; + } + + @Override + public Tuple2<String, Integer> map(String line) throws Exception { + record = line.substring(1, line.length() - 1).split(","); + return new Tuple2<String, Integer>(record[0], Integer.parseInt(record[1])); + } + } + public static class MyJoinFunction implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> { @@ -172,22 +190,47 @@ public class WindowJoin { } } + public static class MyTimestamp implements Timestamp<Tuple2<String, Integer>> { + private int counter; + + public MyTimestamp(int starttime) { + this.counter = starttime; + } + + @Override + public long getTimestamp(Tuple2<String, Integer> value) { + counter += SLEEP_TIME; + return counter; + } + } + // ************************************************************************* // UTIL METHODS // ************************************************************************* + private static boolean fileInput = false; private static boolean fileOutput = false; + + private static String gradesPath; + private static String salariesPath; private static String outputPath; private static boolean parseParameters(String[] args) { if (args.length > 0) { // parse input arguments - fileOutput = true; if (args.length == 1) { + fileOutput = true; outputPath = args[0]; + } else if (args.length == 3) { + fileInput = true; + fileOutput = true; + gradesPath = args[0]; + salariesPath = args[1]; + outputPath = args[2]; } else { - System.err.println("Usage: WindowJoin <result path>"); + System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 1> " + + "<result path>"); return false; } } else { @@ -197,4 +240,14 @@ public class WindowJoin { } return true; } + + private static void setInputStreams(StreamExecutionEnvironment env) { + if (fileInput) { + grades = env.readTextFile(gradesPath).map(new MySourceMap()); + salaries = env.readTextFile(salariesPath).map(new MySourceMap()); + } else { + grades = env.addSource(new GradeSource()); + salaries = env.addSource(new SalarySource()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index 26895f2..68b105a 100755 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -17,28 +17,27 @@ package org.apache.flink.streaming.examples.ml; -import java.util.concurrent.TimeUnit; - import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.WindowMapFunction; import org.apache.flink.streaming.api.function.co.CoMapFunction; import org.apache.flink.streaming.api.function.source.SourceFunction; import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.util.Collector; /** * Skeleton for incremental machine learning algorithm consisting of a * pre-computed model, which gets updated for the new inputs and new input data * for which the job provides predictions. - * + * <p/> * <p> * This may serve as a base of a number of algorithms, e.g. updating an * incremental Alternating Least Squares model while also providing the * predictions. * </p> - * - * <p> + * <p/> + * <p/> * This example shows how to use: * <ul> * <li>Connected streams @@ -48,6 +47,9 @@ import org.apache.flink.util.Collector; */ public class IncrementalLearningSkeleton { + private static DataStream<Integer> trainingData = null; + private static DataStream<Integer> newData = null; + // ************************************************************************* // PROGRAM // ************************************************************************* @@ -59,15 +61,15 @@ public class IncrementalLearningSkeleton { } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // env.setDegreeOfParallelism(1); + createSourceStreams(env); // build new model on every second of new data - DataStream<Double[]> model = env.addSource(new TrainingDataSource()) - .window(Time.of(5000, TimeUnit.MILLISECONDS)) + DataStream<Double[]> model = trainingData.window(Time.of(5000, new LinearTimestamp())) .mapWindow(new PartialModelBuilder()).flatten(); - // use partial model for prediction - DataStream<Integer> prediction = env.addSource(new NewDataSource()).connect(model) - .map(new Predictor()); + // use partial model for newData + DataStream<Integer> prediction = newData.connect(model).map(new Predictor()); // emit result if (fileOutput) { @@ -85,7 +87,7 @@ public class IncrementalLearningSkeleton { // ************************************************************************* /** - * Feeds new data for prediction. By default it is implemented as constantly + * Feeds new data for newData. By default it is implemented as constantly * emitting the Integer 1 in a loop. */ public static class NewDataSource implements SourceFunction<Integer> { @@ -111,6 +113,34 @@ public class IncrementalLearningSkeleton { } /** + * Feeds new data for newData. By default it is implemented as constantly + * emitting the Integer 1 in a loop. + */ + public static class FiniteNewDataSource implements SourceFunction<Integer> { + private static final long serialVersionUID = 1L; + private int counter; + + @Override + public void run(Collector<Integer> collector) throws Exception { + Thread.sleep(15); + while (counter < 50) { + collector.collect(getNewData()); + } + } + + @Override + public void cancel() { + // No cleanup needed + } + + private Integer getNewData() throws InterruptedException { + Thread.sleep(5); + counter++; + return 1; + } + } + + /** * Feeds new training data for the partial model builder. By default it is * implemented as constantly emitting the Integer 1 in a loop. */ @@ -139,13 +169,50 @@ public class IncrementalLearningSkeleton { } /** + * Feeds new training data for the partial model builder. By default it is + * implemented as constantly emitting the Integer 1 in a loop. + */ + public static class FiniteTrainingDataSource implements SourceFunction<Integer> { + private static final long serialVersionUID = 1L; + private int counter = 0; + + @Override + public void run(Collector<Integer> collector) throws Exception { + while (counter < 8200) { + collector.collect(getTrainingData()); + } + } + + @Override + public void cancel() { + // No cleanup needed + } + + private Integer getTrainingData() throws InterruptedException { + counter++; + return 1; + } + } + + public static class LinearTimestamp implements Timestamp<Integer> { + private static final long serialVersionUID = 1L; + + private long counter = 0L; + + @Override + public long getTimestamp(Integer value) { + return counter += 10L; + } + } + + /** * Builds up-to-date partial models on new training data. */ public static class PartialModelBuilder implements WindowMapFunction<Integer, Double[]> { private static final long serialVersionUID = 1L; protected Double[] buildPartialModel(Iterable<Integer> values) { - return new Double[] { 1. }; + return new Double[]{1.}; } @Override @@ -155,11 +222,11 @@ public class IncrementalLearningSkeleton { } /** - * Creates prediction using the model produced in batch-processing and the + * Creates newData using the model produced in batch-processing and the * up-to-date partial model. - * + * <p/> * <p> - * By defaults emits the Integer 0 for every prediction and the Integer 1 + * By defaults emits the Integer 0 for every newData and the Integer 1 * for every model update. * </p> */ @@ -171,7 +238,7 @@ public class IncrementalLearningSkeleton { @Override public Integer map1(Integer value) { - // Return prediction + // Return newData return predict(value); } @@ -185,10 +252,10 @@ public class IncrementalLearningSkeleton { // pulls model built with batch-job on the old training data protected Double[] getBatchModel() { - return new Double[] { 0. }; + return new Double[]{0.}; } - // performs prediction using the two models + // performs newData using the two models protected Integer predict(Integer inTuple) { return 0; } @@ -220,4 +287,14 @@ public class IncrementalLearningSkeleton { } return true; } + + public static void createSourceStreams(StreamExecutionEnvironment env) { + if (fileOutput) { + trainingData = env.addSource(new FiniteTrainingDataSource()); + newData = env.addSource(new FiniteNewDataSource()); + } else { + trainingData = env.addSource(new TrainingDataSource()); + newData = env.addSource(new NewDataSource()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java index e9b60f4..1473097 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java @@ -26,26 +26,26 @@ import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer; * This example shows an implementation of WordCount with data from a text * socket. To run the example make sure that the service providing the text data * is already up and running. - * - * <p> + * <p/> + * <p/> * To start an example socket text stream on your local machine run netcat from * a command line: <code>nc -lk 9999</code>, where the parameter specifies the * port number. - * - * - * <p> + * <p/> + * <p/> + * <p/> * Usage: * <code>SocketTextStreamWordCount <hostname> <port> <result path></code> * <br> - * - * <p> + * <p/> + * <p/> * This example shows how to: * <ul> * <li>use StreamExecutionEnvironment.socketTextStream * <li>write a simple Flink program, * <li>write and use user-defined functions. * </ul> - * + * * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a> */ public class SocketTextStreamWordCount { @@ -60,14 +60,14 @@ public class SocketTextStreamWordCount { .getExecutionEnvironment(); // get input data - DataStream<String> text = env.socketTextStream(hostName, port); + DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0); DataStream<Tuple2<String, Integer>> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0) - .sum(1); + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); if (fileOutput) { counts.writeAsText(outputPath, 1); http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java index 1901475..272381f 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.examples.twitter; -import java.util.StringTokenizer; - import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -29,26 +27,27 @@ import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData; import org.apache.flink.util.Collector; import org.apache.sling.commons.json.JSONException; +import java.util.StringTokenizer; + /** * Implements the "TwitterStream" program that computes a most used word * occurrence over JSON files in a streaming fashion. - * - * <p> + * <p/> + * <p/> * The input is a JSON text file with lines separated by newline characters. - * - * <p> + * <p/> + * <p/> * Usage: <code>TwitterStream <text path></code><br> * If no parameters are provided, the program is run with default data from * {@link TwitterStreamData}. - * - * <p> + * <p/> + * <p/> * This example shows how to: * <ul> * <li>acquire external data, * <li>use in-line defined functions, * <li>handle flattened stream inputs. * </ul> - * */ public class TwitterStream { @@ -70,9 +69,9 @@ public class TwitterStream { DataStream<String> streamSource = getTextDataStream(env); DataStream<Tuple2<String, Integer>> tweets = streamSource - // selecting English tweets and splitting to words + // selecting English tweets and splitting to words .flatMap(new SelectEnglishAndTokenizeFlatMap()) - // returning (word, 1) + // returning (word, 1) .map(new MapFunction<String, Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @@ -81,14 +80,14 @@ public class TwitterStream { return new Tuple2<String, Integer>(value, 1); } }) - // group by words and sum their occurence + // group by words and sum their occurence .groupBy(0).sum(1) - // select word with maximum occurence + // select word with maximum occurence .flatMap(new SelectMaxOccurence()); // emit result if (fileOutput) { - tweets.writeAsText(outputPath, 1); + tweets.writeAsText(outputPath, 1L); } else { tweets.print(); } @@ -103,7 +102,7 @@ public class TwitterStream { /** * Makes sentences from English tweets. - * + * <p/> * <p> * Implements a string tokenizer that splits sentences into words as a * user-defined FlatMapFunction. The function takes a line (String) and @@ -167,6 +166,7 @@ public class TwitterStream { // UTIL METHODS // ************************************************************************* + private static boolean fileInput = false; private static boolean fileOutput = false; private static String textPath; private static String outputPath; @@ -176,8 +176,11 @@ public class TwitterStream { // parse input arguments fileOutput = true; if (args.length == 2) { + fileInput = true; textPath = args[0]; outputPath = args[1]; + } else if (args.length == 1) { + outputPath = args[0]; } else { System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>"); return false; @@ -191,7 +194,7 @@ public class TwitterStream { } private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) { - if (fileOutput) { + if (fileInput) { // read the text file from given input path return env.readTextFile(textPath); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java index 311c6b2..bf3802b 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.examples.windowing; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -24,11 +25,11 @@ import org.apache.flink.streaming.api.function.source.SourceFunction; import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; import org.apache.flink.streaming.api.windowing.helper.Delta; import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.Random; -import java.util.concurrent.TimeUnit; /** * An example of grouped stream windowing where different eviction and trigger @@ -45,13 +46,17 @@ public class TopSpeedWindowingExample { return; } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - @SuppressWarnings({ "rawtypes", "serial" }) - DataStream topSpeeds = env - .addSource(CarSource.create(numOfCars)) - .groupBy(0) - .window(Time.of(evictionSec, TimeUnit.SECONDS)) + @SuppressWarnings({"rawtypes", "serial"}) + DataStream<Tuple4<Integer, Integer, Double, Long>> carData; + if (fileInput) { + carData = env.readTextFile(inputPath).map(new ParseCarData()); + } else { + carData = env.addSource(CarSource.create(numOfCars)); + } + DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.groupBy(0) + .window(Time.of(evictionSec, new CarTimestamp())) .every(Delta.of(triggerMeters, new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() { @Override @@ -61,8 +66,12 @@ public class TopSpeedWindowingExample { return newDataPoint.f2 - oldDataPoint.f2; } }, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).local().maxBy(1).flatten(); + if (fileOutput) { + topSpeeds.writeAsText(outputPath); + } else { + topSpeeds.print(); + } - topSpeeds.print(); env.execute("CarTopSpeedWindowingExample"); } @@ -99,8 +108,9 @@ public class TopSpeedWindowingExample { speeds[carId] = Math.max(0, speeds[carId] - 5); } distances[carId] += speeds[carId] / 3.6d; - collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId, - speeds[carId], distances[carId], System.currentTimeMillis())); + Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId, + speeds[carId], distances[carId], System.currentTimeMillis()); + collector.collect(record); } } } @@ -110,9 +120,34 @@ public class TopSpeedWindowingExample { } } + private static class ParseCarData extends + RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple4<Integer, Integer, Double, Long> map(String record) { + String rawData = record.substring(1, record.length() - 1); + String[] data = rawData.split(","); + return new Tuple4<Integer, Integer, Double, Long>(Integer.valueOf(data[0]), + Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3])); + } + } + + private static class CarTimestamp implements Timestamp<Tuple4<Integer, Integer, Double, Long>> { + + @Override + public long getTimestamp(Tuple4<Integer, Integer, Double, Long> value) { + return value.f3; + } + } + + private static boolean fileInput = false; + private static boolean fileOutput = false; private static int numOfCars = 2; private static int evictionSec = 10; private static double triggerMeters = 50; + private static String inputPath; + private static String outputPath; private static boolean parseParameters(String[] args) { @@ -121,6 +156,11 @@ public class TopSpeedWindowingExample { numOfCars = Integer.valueOf(args[0]); evictionSec = Integer.valueOf(args[1]); triggerMeters = Double.valueOf(args[2]); + } else if (args.length == 2) { + fileInput = true; + fileOutput = true; + inputPath = args[0]; + outputPath = args[1]; } else { System.err .println("Usage: TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>"); http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java index d582c10..5ff3fc1 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.examples.wordcount; import org.apache.flink.api.common.functions.FlatMapFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java index 51da0d6..85e1dca 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.examples.test.wordcount; import org.apache.flink.streaming.examples.wordcount.WordCount; -import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; public class WordCountITCase extends StreamingProgramTestBase { @@ -40,6 +40,6 @@ public class WordCountITCase extends StreamingProgramTestBase { @Override protected void testProgram() throws Exception { - WordCount.main(new String[] { textPath, resultPath }); + WordCount.main(new String[]{textPath, resultPath}); } }