[streaming] Windowing examples cleanup Removed obsolete examples Updated StockPrices example Updated build
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b402f43 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b402f43 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b402f43 Branch: refs/heads/master Commit: 6b402f43d01495fd9a6bea1c80b2b2b50393b92d Parents: b5752a7 Author: mbalassi <[email protected]> Authored: Mon Feb 9 18:17:20 2015 +0100 Committer: mbalassi <[email protected]> Committed: Tue Feb 10 08:51:00 2015 +0100 ---------------------------------------------------------------------- .../flink-streaming-examples/pom.xml | 72 ++------- .../examples/windowing/DeltaExtractExample.java | 125 --------------- .../windowing/MultiplePoliciesExample.java | 139 ----------------- .../examples/windowing/SlidingExample.java | 139 ----------------- .../examples/windowing/StockPrices.java | 69 ++++++++- .../windowing/TimeWindowingExample.java | 152 ------------------- .../scala/examples/join/WindowJoin.scala | 74 +++++++++ .../scala/examples/windowing/StockPrices.scala | 52 +++++++ .../scala/examples/windowing/WindowJoin.scala | 73 --------- 9 files changed, 207 insertions(+), 688 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml index 3661726..6a5ae36 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml +++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml @@ -258,101 +258,57 @@ under the License. </configuration> </execution> - <!-- DeltaExract --> + <!-- StockPrices --> <execution> - <id>DeltaExract</id> + <id>StockPrices</id> <phase>package</phase> <goals> <goal>jar</goal> </goals> <configuration> - <classifier>DeltaExract</classifier> + <classifier>StockPrices</classifier> <archive> <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class> + <program-class>org.apache.flink.streaming.examples.windowing.StockPrices</program-class> </manifestEntries> </archive> <includes> - <include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include> - <include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include> + <include>org/apache/flink/streaming/examples/windowing/StockPrices.class</include> + <include>org/apache/flink/streaming/examples/windowing/StockPrices$*.class</include> </includes> </configuration> </execution> - <!-- MultiplePolicies --> + <!-- TopSpeedWindowing --> <execution> - <id>MultiplePolicies</id> + <id>TopSpeedWindowing</id> <phase>package</phase> <goals> <goal>jar</goal> </goals> <configuration> - <classifier>MultiplePolicies</classifier> + <classifier>TopSpeedWindowing</classifier> <archive> <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class> + <program-class>org.apache.flink.streaming.examples.windowing.TopSpeedWindowing</program-class> </manifestEntries> </archive> <includes> - <include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include> - <include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include> + <include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include> + <include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include> </includes> </configuration> </execution> - <!-- SlidingExample --> - <execution> - <id>SlidingExample</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>SlidingExample</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include> - <include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include> - </includes> - </configuration> - </execution> - - <!-- TimeWindowing --> - <execution> - <id>TimeWindowing</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>TimeWindowing</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include> - <include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include> - </includes> - </configuration> - </execution> </executions> </plugin> -<!-- Scala Compiler --> + + <!-- Scala Compiler --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java deleted file mode 100644 index d6a9ac0..0000000 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.deltafunction.EuclideanDistance; -import org.apache.flink.streaming.api.windowing.extractor.FieldsFromTuple; -import org.apache.flink.streaming.api.windowing.helper.Count; -import org.apache.flink.streaming.api.windowing.helper.Delta; -import org.apache.flink.util.Collector; - -/** - * This example gives an impression about how to use delta policies. It also - * shows how extractors can be used. - */ -public class DeltaExtractExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - DataStream dstream = env - .addSource(new CountingSource()) - .window(Delta.of(1.2, new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3( - 0d, 0d, "foo"))).every(Count.of(2)).reduce(new ConcatStrings()); - - // emit result - if (fileOutput) { - dstream.writeAsText(outputPath, 1); - } else { - dstream.print(); - } - - // execute the program - env.execute("Delta Extract Example"); - - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - private static class CountingSource implements SourceFunction<Tuple3<Double, Double, String>> { - private static final long serialVersionUID = 1L; - - private int counter = 0; - - @Override - public void invoke(Collector<Tuple3<Double, Double, String>> collector) throws Exception { - while (true) { - if (counter > 9999) { - counter = 0; - } - collector.collect(new Tuple3<Double, Double, String>((double) counter, - (double) counter + 1, "V" + counter++)); - } - } - } - - private static final class ConcatStrings implements - ReduceFunction<Tuple3<Double, Double, String>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple3<Double, Double, String> reduce(Tuple3<Double, Double, String> value1, - Tuple3<Double, Double, String> value2) throws Exception { - return new Tuple3<Double, Double, String>(value1.f0, value2.f1, value1.f2 + "|" - + value2.f2); - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 1) { - outputPath = args[0]; - } else { - System.err.println("Usage: DeltaExtractExample <result path>"); - return false; - } - } else { - System.out.println("Executing DeltaExtractExample with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: DeltaExtractExample <result path>"); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java deleted file mode 100644 index 48783f2..0000000 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Count; -import org.apache.flink.util.Collector; - -/** - * This example uses count based tumbling windowing with multiple eviction - * policies at the same time. - */ -public class MultiplePoliciesExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream<String> stream = env.addSource(new BasicSource()) - .groupBy(new KeySelector<String, String>(){ - private static final long serialVersionUID = 1L; - @Override - public String getKey(String value) throws Exception { - return value; - } - }) - .window(Count.of(2)) - .every(Count.of(3), Count.of(5)) - .reduceGroup(new Concat()); - - // emit result - if (fileOutput) { - stream.writeAsText(outputPath, 1); - } else { - stream.print(); - } - - // execute the program - env.execute("Multiple Policies Example"); - } - - /** - * This source function indefinitely provides String inputs for the - * topology. - */ - public static final class BasicSource implements SourceFunction<String> { - - private static final long serialVersionUID = 1L; - - private final static String STR_1 = new String("streaming"); - private final static String STR_2 = new String("flink"); - - @Override - public void invoke(Collector<String> out) throws Exception { - // continuous emit - while (true) { - out.collect(STR_1); - out.collect(STR_2); - } - } - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * This reduce function does a String concat. - */ - public static final class Concat implements GroupReduceFunction<String, String> { - - /** - * Auto generates version ID - */ - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterable<String> values, Collector<String> out) throws Exception { - String output = "|"; - for (String v : values) { - output = output + v + "|"; - } - out.collect(output); - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 1) { - outputPath = args[0]; - } else { - System.err.println("Usage: MultiplePoliciesExample <result path>"); - return false; - } - } else { - System.out.println("Executing MultiplePoliciesExample with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: MultiplePoliciesExample <result path>"); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java deleted file mode 100644 index cf03477..0000000 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Count; -import org.apache.flink.util.Collector; - -/** - * This example uses count based sliding windows to illustrate different - * possibilities for the realization of sliding windows. Take a look on the code - * which is commented out to see different setups. - */ -public class SlidingExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - /* - * SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the - * buffer Resulting windows will have an overlap of 5 elements - */ - - // DataStream<String> stream = env.addSource(new CountingSource()) - // .window(Count.of(10)) - // .every(Count.of(5)) - // .reduce(new Concat()); - - /* - * ADVANCED-EXAMPLE: Use this to have the last element of the last - * window as first element of the next window while the window size is - * always 5 - */ - - DataStream<String> stream = env.addSource(new CountingSource()) - .window(Count.of(5) - .withDelete(4)) - .every(Count.of(4) - .startingAt(-1)) - .reduce(new Concat()); - - // emit result - if (fileOutput) { - stream.writeAsText(outputPath, 1); - } else { - stream.print(); - } - - // execute the program - env.execute("Sliding Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - private static final class CountingSource implements SourceFunction<String> { - private static final long serialVersionUID = 1L; - - private int counter = 0; - - @Override - public void invoke(Collector<String> collector) throws Exception { - // continuous emit - while (true) { - if (counter > 9999) { - counter = 0; - } - collector.collect("V" + counter++); - } - } - } - - /** - * This reduce function does a String concat. - */ - private static final class Concat implements ReduceFunction<String> { - private static final long serialVersionUID = 1L; - - @Override - public String reduce(String value1, String value2) throws Exception { - return value1 + "|" + value2; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 1) { - outputPath = args[0]; - } else { - System.err.println("Usage: SlidingExample <result path>"); - return false; - } - } else { - System.out.println("Executing SlidingExample with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: SlidingExample <result path>"); - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java index c60b5ca..ce5db4a 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java @@ -38,6 +38,36 @@ import java.util.Arrays; import java.util.Random; import java.util.concurrent.TimeUnit; +/** + * This example showcases a moderately complex Flink Streaming pipeline. + * It to computes statistics on stock market data that arrive continuously, + * and combines the stock market data with tweet streams. + * For a detailed explanation of the job, check out the blog post unrolling it. + * To run the example make sure that the service providing the text data + * is already up and running. + * + * <p> + * To start an example socket text stream on your local machine run netcat from + * a command line: <code>nc -lk 9999</code>, where the parameter specifies the + * port number. + * + * + * <p> + * Usage: + * <code>StockPrices <hostname> <port> <result path></code> + * <br> + * + * <p> + * This example shows how to: + * <ul> + * <li>merge and join data streams, + * <li>use different windowing policies, + * <li>define windowing aggregations. + * </ul> + * + * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a> + * @see <a href="http://flink.apache.org/news/2015/02/09/streaming-example.html">blogpost</a> + */ public class StockPrices { private static final ArrayList<String> SYMBOLS = new ArrayList<String>(Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")); @@ -50,13 +80,17 @@ public class StockPrices { public static void main(String[] args) throws Exception { + if (!parseParameters(args)) { + return; + } + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Step 1 //Read a stream of stock prices from different sources and merge it into one stream //Read from a socket stream at map it to StockPrice objects - DataStream<StockPrice> socketStockStream = env.socketTextStream("localhost", 9999) + DataStream<StockPrice> socketStockStream = env.socketTextStream(hostName, port) .map(new MapFunction<String, StockPrice>() { private String[] tokens; @@ -155,7 +189,11 @@ public class StockPrices { .reduceGroup(new CorrelationReduce()) .setParallelism(1); - rollingCorrelation.print(); + if (fileOutput) { + rollingCorrelation.writeAsText(outputPath, 1); + } else { + rollingCorrelation.print(); + } env.execute("Stock stream"); @@ -338,4 +376,31 @@ public class StockPrices { } } + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String hostName; + private static int port; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + // parse input arguments + if (args.length == 3) { + fileOutput = true; + hostName = args[0]; + port = Integer.valueOf(args[1]); + outputPath = args[2]; + } else if (args.length == 2) { + hostName = args[0]; + port = Integer.valueOf(args[1]); + } else { + System.err.println("Usage: StockPrices <hostname> <port> [<output path>]"); + return false; + } + return true; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java deleted file mode 100644 index 622aa82..0000000 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing; - -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.RichSourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Count; -import org.apache.flink.streaming.api.windowing.helper.Time; -import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy; -import org.apache.flink.util.Collector; - -/** - * This example shows the functionality of time based windows. It utilizes the - * {@link ActiveTriggerPolicy} implementation in the - * {@link ActiveTimeTriggerPolicy}. - */ -public class TimeWindowingExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep()) - .window(Count.of(100)) - .every(Time.of(1000, TimeUnit.MILLISECONDS)) - .groupBy(new MyKey()) - .sum(0); - - // emit result - if (fileOutput) { - stream.writeAsText(outputPath, 1); - } else { - stream.print(); - } - - // execute the program - env.execute("Time Windowing Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * This data source emit one element every 0.001 sec. The output is an - * Integer counting the output elements. As soon as the counter reaches - * 10000 it is reset to 0. On each reset the source waits 5 sec. before it - * restarts to produce elements. - */ - private static final class CountingSourceWithSleep extends RichSourceFunction<Integer> { - private static final long serialVersionUID = 1L; - - private int counter = 0; - private transient Random rnd; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - rnd = new Random(); - } - - @Override - public void invoke(Collector<Integer> collector) throws Exception { - // continuous emit - while (true) { - if (counter > 9999) { - System.out.println("Source pauses now!"); - Thread.sleep(5000); - System.out.println("Source continouse with emitting now!"); - counter = 0; - } - collector.collect(rnd.nextInt(9) + 1); - - // Wait 0.001 sec. before the next emit. Otherwise the source is - // too fast for local tests and you might always see - // SUM[k=1..9999](k) as result. - Thread.sleep(1); - counter++; - } - } - } - - private static final class MyKey implements KeySelector<Integer, Integer> { - - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(Integer value) throws Exception { - if (value < 2) { - return 0; - } else { - return 1; - } - } - - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 1) { - outputPath = args[0]; - } else { - System.err.println("Usage: TimeWindowingExample <result path>"); - return false; - } - } else { - System.out.println("Executing TimeWindowingExample with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: TimeWindowingExample <result path>"); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala new file mode 100644 index 0000000..a36a03b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.join + +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.scala._ + +import scala.Stream._ +import scala.language.postfixOps +import scala.util.Random + +object WindowJoin { + + case class Name(id: Long, name: String) + case class Age(id: Long, age: Int) + case class Person(name: String, age: Long) + + def main(args: Array[String]) { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2)) + val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2)) + + //Join the two input streams by id on the last 2 seconds every second and create new + //Person objects containing both name and age + val joined = + names.join(ages).onWindow(2, TimeUnit.SECONDS) + .every(1, TimeUnit.SECONDS) + .where("id") + .equalTo("id") { (n, a) => Person(n.name, a.age) } + + joined print + + env.execute("WindowJoin") + } + + def nameStream() : Stream[(Long,String)] = { + def nameMapper(names: Array[String])(x: Int) : (Long, String) = + { + if(x%100==0) Thread.sleep(1000) + (x, names(Random.nextInt(names.length))) + } + range(1,10000).map(nameMapper(Array("tom", "jerry", "alice", "bob", "john", "grace"))) + } + + def ageStream() : Stream[(Long,Int)] = { + def ageMapper(x: Int) : (Long, Int) = + { + if(x%100==0) Thread.sleep(1000) + (x, Random.nextInt(90)) + } + range(1,10000).map(ageMapper) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala index f357fe7..8a0ce5e 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala @@ -27,6 +27,33 @@ import org.apache.flink.util.Collector import scala.util.Random +/** + * This example showcases a moderately complex Flink Streaming pipeline. + * It to computes statistics on stock market data that arrive continuously, + * and combines the stock market data with tweet streams. + * For a detailed explanation of the job, check out the + * [[http://flink.apache.org/news/2015/02/09/streaming-example.html blog post]] + * unrolling it. To run the example make sure that the service providing + * the text data is already up and running. + * + * To start an example socket text stream on your local machine run netcat + * from a command line, where the parameter specifies the port number: + * + * {{{ + * nc -lk 9999 + * }}} + * + * Usage: + * {{{ + * StockPrices <hostname> <port> <output path> + * }}} + * + * This example shows how to: + * + * - merge and join data streams, + * - use different windowing policies, + * - define windowing aggregations. + */ object StockPrices { case class StockPrice(symbol: String, price: Double) @@ -36,8 +63,17 @@ object StockPrices { val defaultPrice = StockPrice("", 1000) + private var fileOutput: Boolean = false + private var hostName: String = null + private var port: Int = 0 + private var outputPath: String = null + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + val env = StreamExecutionEnvironment.getExecutionEnvironment //Step 1 @@ -163,4 +199,20 @@ object StockPrices { } } + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 3) { + fileOutput = true + hostName = args(0) + port = args(1).toInt + outputPath = args(2) + } else if (args.length == 2) { + hostName = args(0) + port = args(1).toInt + } else { + System.err.println("Usage: StockPrices <hostname> <port> [<output path>]") + return false + } + true + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala deleted file mode 100644 index 119862e..0000000 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.scala.examples.windowing - -import org.apache.flink.streaming.api.scala._ - -import scala.Stream._ -import scala.util.Random -import java.util.concurrent.TimeUnit -import scala.language.postfixOps - -object WindowJoin { - - case class Name(id: Long, name: String) - case class Age(id: Long, age: Int) - case class Person(name: String, age: Long) - - def main(args: Array[String]) { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - - //Create streams for names and ages by mapping the inputs to the corresponding objects - val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2)) - val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2)) - - //Join the two input streams by id on the last 2 seconds every second and create new - //Person objects containing both name and age - val joined = - names.join(ages).onWindow(2, TimeUnit.SECONDS) - .every(1, TimeUnit.SECONDS) - .where("id") - .equalTo("id") { (n, a) => Person(n.name, a.age) } - - joined print - - env.execute("WindowJoin") - } - - def nameStream() : Stream[(Long,String)] = { - def nameMapper(names: Array[String])(x: Int) : (Long, String) = - { - if(x%100==0) Thread.sleep(1000) - (x, names(Random.nextInt(names.length))) - } - range(1,10000).map(nameMapper(Array("tom", "jerry", "alice", "bob", "john", "grace"))) - } - - def ageStream() : Stream[(Long,Int)] = { - def ageMapper(x: Int) : (Long, Int) = - { - if(x%100==0) Thread.sleep(1000) - (x, Random.nextInt(90)) - } - range(1,10000).map(ageMapper) - } - -}
