[FLINK-1560] [streaming] Added ITCases to streaming examples
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/464e7828 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/464e7828 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/464e7828 Branch: refs/heads/master Commit: 464e782868e1f697809d681ce7f8528bef4f2bdb Parents: ed7d165 Author: szape <nemderogator...@gmail.com> Authored: Wed Mar 25 09:35:39 2015 +0100 Committer: mbalassi <mbala...@apache.org> Committed: Tue Apr 7 16:09:37 2015 +0200 ---------------------------------------------------------------------- .../examples/iteration/IterateExample.java | 70 ++++---- .../iteration/util/IterateExampleData.java | 32 ++++ .../examples/join/util/WindowJoinData.java | 66 ++++++++ .../util/IncrementalLearningSkeletonData.java | 34 ++++ .../twitter/util/TwitterStreamData.java | 27 +++ .../examples/windowing/SessionWindowing.java | 55 ++++++- .../windowing/util/SessionWindowingData.java | 27 +++ .../util/TopSpeedWindowingExampleData.java | 165 +++++++++++++++++++ .../test/iteration/IterateExampleITCase.java | 45 +++++ .../examples/test/join/WindowJoinITCase.java | 48 ++++++ .../ml/IncrementalLearningSkeletonITCase.java | 42 +++++ .../socket/SocketTextStreamWordCountITCase.java | 94 +++++++++++ .../test/twitter/TwitterStreamITCase.java | 42 +++++ .../test/windowing/SessionWindowingITCase.java | 43 +++++ .../TopSpeedWindowingExampleITCase.java | 45 +++++ 15 files changed, 789 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index d9a8167..f5f2cd7 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -26,32 +26,24 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.TimeUnit; /** - * Example illustrating iterations in Flink streaming. + * Example illustrating iterations in Flink streaming. <p/> <p> The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. </p> * <p/> - * <p> - * The program sums up random numbers and counts additions it performs to reach - * a specific threshold in an iterative streaming fashion. - * </p> * <p/> - * <p/> - * This example shows how to use: - * <ul> - * <li>streaming iterations, - * <li>buffer timeout to enhance latency, - * <li>directed outputs. - * </ul> + * This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed + * outputs. </ul> */ public class IterateExample { + private static final int BOUND = 100; + // ************************************************************************* // PROGRAM // ************************************************************************* @@ -71,7 +63,7 @@ public class IterateExample { // create input stream of integer pairs DataStream<Tuple2<Integer, Integer>> inputStream; - if(fileInput) { + if (fileInput) { inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap()); } else { inputStream = env.addSource(new RandomFibonacciSource()); @@ -94,10 +86,7 @@ public class IterateExample { // 'output' channel then get the input pairs that have the greatest iteration counter // on a 1 second sliding window DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output") - .map(new OutputMap()) - .window(Time.of(1L, TimeUnit.SECONDS)) - .every(Time.of(500L, TimeUnit.MILLISECONDS)) - .maxBy(1).flatten(); + .map(new OutputMap()); // emit results if (fileOutput) { @@ -124,12 +113,12 @@ public class IterateExample { @Override public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception { - while(true) { - int first = rnd.nextInt(BOUND/2 - 1) + 1; - int second = rnd.nextInt(BOUND/2 - 1) + 1; + while (true) { + int first = rnd.nextInt(BOUND / 2 - 1) + 1; + int second = rnd.nextInt(BOUND / 2 - 1) + 1; collector.collect(new Tuple2<Integer, Integer>(first, second)); - Thread.sleep(100L); + Thread.sleep(500L); } } @@ -147,18 +136,19 @@ public class IterateExample { @Override public Tuple2<Integer, Integer> map(String value) throws Exception { - Thread.sleep(100L); - String record = value.substring(1, value.length()-1); + String record = value.substring(1, value.length() - 1); String[] splitted = record.split(","); return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); } } /** - * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple - * A counter is attached to the tuple and incremented in every iteration step + * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A + * counter is attached to the tuple and incremented in every iteration step */ - public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> { + public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, + Integer, Integer>> { + private static final long serialVersionUID = 1L; @Override public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws @@ -171,12 +161,15 @@ public class IterateExample { * Iteration step function that calculates the next Fibonacci number */ public static class Step implements - MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> { + MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, + Integer, Integer>> { private static final long serialVersionUID = 1L; @Override - public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception { - return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4); + public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, + Integer> value) throws Exception { + return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 + + value.f3, ++value.f4); } } @@ -194,7 +187,6 @@ public class IterateExample { } else { output.add("output"); } - output.add("output"); return output; } } @@ -202,12 +194,16 @@ public class IterateExample { /** * Giving back the input pair and the counter */ - public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> { + public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, + Tuple2<Tuple2<Integer, Integer>, Integer>> { + private static final long serialVersionUID = 1L; @Override - public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws + public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> + value) throws Exception { - return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4); + return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), + value.f4); } } @@ -219,7 +215,6 @@ public class IterateExample { private static boolean fileOutput = false; private static String inputPath; private static String outputPath; - private static final int BOUND = 100; private static boolean parseParameters(String[] args) { @@ -228,7 +223,7 @@ public class IterateExample { if (args.length == 1) { fileOutput = true; outputPath = args[0]; - } else if(args.length == 2) { + } else if (args.length == 2) { fileInput = true; inputPath = args[0]; fileOutput = true; @@ -244,4 +239,5 @@ public class IterateExample { } return true; } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java new file mode 100644 index 0000000..0077459 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.iteration.util; + +public class IterateExampleData { + public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" + + "(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" + + "(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)"; + + public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" + + "((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" + + "((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" + + "((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)"; + + private IterateExampleData() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java new file mode 100644 index 0000000..7d0c746 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.join.util; + +public class WindowJoinData { + + public static final String GRADES_INPUT = "(john,5)\n" + "(tom,3)\n" + "(alice,1)\n" + "(grace,5)\n" + + "(john,4)\n" + "(bob,1)\n" + "(alice,2)\n" + "(alice,3)\n" + "(bob,5)\n" + "(alice,3)\n" + "(tom,5)\n" + + "(john,2)\n" + "(john,1)\n" + "(grace,2)\n" + "(jerry,2)\n" + "(tom,4)\n" + "(bob,4)\n" + "(bob,2)\n" + + "(tom,2)\n" + "(alice,5)\n" + "(grace,5)\n" + "(grace,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(tom,1)\n" + + "(jerry,5)\n" + "(john,3)\n" + "(john,4)\n" + "(john,1)\n" + "(jerry,3)\n" + "(grace,3)\n" + "(bob,3)\n" + + "(john,3)\n" + "(jerry,4)\n" + "(tom,5)\n" + "(tom,4)\n" + "(john,2)\n" + "(jerry,1)\n" + "(bob,1)\n" + + "(john,5)\n" + "(grace,4)\n" + "(tom,5)\n" + "(john,4)\n" + "(tom,1)\n" + "(grace,1)\n" + "(john,2)\n" + + "(jerry,3)\n" + "(jerry,5)\n" + "(tom,2)\n" + "(tom,2)\n" + "(alice,4)\n" + "(tom,4)\n" + "(jerry,4)\n" + + "(john,3)\n" + "(grace,4)\n" + "(tom,3)\n" + "(jerry,4)\n" + "(john,5)\n" + "(john,4)\n" + "(jerry,1)\n" + + "(john,5)\n" + "(alice,2)\n" + "(tom,1)\n" + "(alice,5)\n" + "(grace,4)\n" + "(bob,4)\n" + "(jerry,1)\n" + + "(john,5)\n" + "(tom,4)\n" + "(tom,5)\n" + "(jerry,5)\n" + "(tom,1)\n" + "(grace,3)\n" + "(bob,5)\n" + + "(john,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(grace,1)\n" + "(jerry,1)\n" + "(jerry,4)\n" + + "(bob,4)\n" + "(alice,3)\n" + "(tom,5)\n" + "(alice,4)\n" + "(alice,4)\n" + "(grace,4)\n" + "(john,5)\n" + + "(john,5)\n" + "(grace,4)\n" + "(tom,4)\n" + "(john,4)\n" + "(john,5)\n" + "(alice,5)\n" + "(jerry,5)\n" + + "(john,3)\n" + "(tom,5)\n" + "(jerry,4)\n" + "(grace,4)\n" + "(john,3)\n" + "(bob,2)"; + + public static final String SALARIES_INPUT = "(john,6469)\n" + "(jerry,6760)\n" + "(jerry,8069)\n" + + "(tom,3662)\n" + "(grace,8427)\n" + "(john,9425)\n" + "(bob,9018)\n" + "(john,352)\n" + "(tom,3770)\n" + + "(grace,7622)\n" + "(jerry,7441)\n" + "(alice,1468)\n" + "(bob,5472)\n" + "(grace,898)\n" + + "(tom,3849)\n" + "(grace,1865)\n" + "(alice,5582)\n" + "(john,9511)\n" + "(alice,1541)\n" + + "(john,2477)\n" + "(grace,3561)\n" + "(john,1670)\n" + "(grace,7290)\n" + "(grace,6565)\n" + + "(tom,6179)\n" + "(tom,1601)\n" + "(john,2940)\n" + "(bob,4685)\n" + "(bob,710)\n" + "(bob,5936)\n" + + "(jerry,1412)\n" + "(grace,6515)\n" + "(grace,3321)\n" + "(tom,8088)\n" + "(john,2876)\n" + + "(bob,9896)\n" + "(grace,7368)\n" + "(grace,9749)\n" + "(bob,2048)\n" + "(alice,4782)\n" + + "(alice,3375)\n" + "(tom,5841)\n" + "(bob,958)\n" + "(bob,5258)\n" + "(tom,3935)\n" + "(jerry,4394)\n" + + "(alice,102)\n" + "(alice,4931)\n" + "(alice,5240)\n" + "(jerry,7951)\n" + "(john,5675)\n" + + "(bob,609)\n" + "(alice,5997)\n" + "(jerry,9651)\n" + "(alice,1328)\n" + "(bob,1022)\n" + + "(grace,2578)\n" + "(jerry,9704)\n" + "(tom,4476)\n" + "(grace,3784)\n" + "(alice,6144)\n" + + "(bob,6213)\n" + "(alice,7525)\n" + "(jerry,2908)\n" + "(grace,8464)\n" + "(jerry,9920)\n" + + "(bob,3720)\n" + "(bob,7612)\n" + "(alice,7211)\n" + "(jerry,6484)\n" + "(alice,1711)\n" + + "(jerry,5994)\n" + "(grace,928)\n" + "(jerry,2492)\n" + "(grace,9080)\n" + "(tom,4330)\n" + + "(bob,8302)\n" + "(john,4981)\n" + "(tom,1781)\n" + "(grace,1379)\n" + "(jerry,3700)\n" + + "(jerry,3584)\n" + "(jerry,2038)\n" + "(jerry,3902)\n" + "(tom,1336)\n" + "(jerry,7500)\n" + + "(tom,3648)\n" + "(alice,2533)\n" + "(tom,8685)\n" + "(bob,3968)\n" + "(tom,3241)\n" + "(bob,7461)\n" + + "(jerry,2138)\n" + "(alice,7503)\n" + "(alice,6424)\n" + "(tom,140)\n" + "(john,9802)\n" + + "(grace,2977)\n" + "(grace,889)\n" + "(john,1338)"; + + public static final String WINDOW_JOIN_RESULTS = "(bob,2,9018)\n" + "(bob,2,5472)\n" + "(bob,2,4685)\n" + + "(bob,2,710)\n" + "(bob,2,5936)\n" + "(bob,2,9896)\n" + "(bob,2,2048)\n" + "(bob,2,958)\n" + + "(bob,2,5258)\n" + "(bob,2,609)\n" + "(bob,2,1022)\n" + "(bob,2,6213)\n" + "(bob,2,3720)\n" + + "(bob,2,7612)\n" + "(bob,2,8302)\n" + "(bob,2,3968)\n" + "(bob,2,7461)"; + + private WindowJoinData() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java new file mode 100644 index 0000000..dedc5ee --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.ml.util; + +public class IncrementalLearningSkeletonData { + + public static final String RESULTS = "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + + "0\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + + "1\n"; + + private IncrementalLearningSkeletonData() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java index 0e7976c..f0eb753 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java @@ -252,4 +252,31 @@ public class TwitterStreamData { "{\"created_at\":\"Wed Oct 01 15:40:10 +0000 2014\",\"id\":517338189300645888,\"id_str\":\"517338189300645888\",\"text\":\"\\uadc0\\uc5ec\\uc6e4\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/download\\/android\\\" rel=\\\"nofollow\\\"\\u003eTwitter for Android\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":2751941412,\"id_str\":\"2751941412\",\"name\":\"BJ\",\"screen_name\":\"beuljin\",\"location\":\"\",\"url\":\"http:\\/\\/twpf.jp\\/beuljin\",\"description\":\"\\ud504\\uc0ac\\ub294 \\uc580\\ub2d8\\uc774 \\uadf8\\ub824\\uc900 \\uc9c4\\uc9dc\\uc5c4\\uccad\\uc815\\ub9d0\\uc9f1\\uc9f1 \\uc798\\uc0dd\\uae30\\uace0 \\uba4b\\uc788\\uace0 \\uadc0\\uc5fd\\uace0 \\uc774\\uc05c \\ud788\\uc5b4\\ub85c\",\"protected\":false,\"verified\": false,\"followers_count\":6,\"friends_count\":17,\"listed_count\":0,\"favourites_count\":26,\"statuses_count\":423,\"created_at\":\"Thu Aug 21 11:51:58 +0000 2014\",\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"lang\":\"ko\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"B2DFDA\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme13\\/bg.gif\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme13\\/bg.gif\",\"profile_background_tile\":false,\"profile_link_color\":\"00B2B8\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"FFFFFF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/515876874082197504\\/u_gw6uoS_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/515876874082197504\\/u_gw6uoS_normal.jpeg\ ",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/2751941412\\/1410318404\",\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[],\"urls\":[],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"ko\",\"timestamp_ms\":\"1412178010689\"}", "{\"created_at\":\"Wed Oct 01 15:40:10 +0000 2014\",\"id\":517338189304836096,\"id_str\":\"517338189304836096\",\"text\":\"RT @rukdd: \\u0e40\\u0e27\\u0e25\\u0e32\\u0e21\\u0e35\\u0e41\\u0e08\\u0e49\\u0e07\\u0e40\\u0e15\\u0e37\\u0e2d\\u0e19\\u0e44\\u0e25\\u0e19\\u0e4c\\u0e40\\u0e02\\u0e49\\u0e32\\u0e21\\u0e32 \\u0e40\\u0e23\\u0e32\\u0e01\\u0e47\\u0e2b\\u0e27\\u0e31\\u0e07\\u0e43\\u0e2b\\u0e49\\u0e40\\u0e1b\\u0e47\\u0e19\\u0e41\\u0e01\\u0e19\\u0e30\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/download\\/android\\\" rel=\\\"nofollow\\\"\\u003eTwitter for Android\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":2534725814,\"id_str\":\"2534725814\",\"name\":\"PtrsyMk. \\u0e49\",\"screen_name\":\"PtrsyMk_pig\",\"location\":\"\",\"url\":null,\"description\":\"\\u0e0a\\u0e37\\u0e48\\u0e2d\\u0e40\\u0 e2d\\u0e25\\u0e1f\\u0e4c \\u0e40\\u0e1e\\u0e37\\u0e48\\u0e2d\\u0e19\\u0e40\\u0e23\\u0e35\\u0e22\\u0e01 \\u0e2d\\u0e49\\u0e27\\u0e19 \\u0e2d\\u0e35\",\"protected\":false,\"verified\":false,\"followers_count\":21,\"friends_count\":136,\"listed_count\":0,\"favourites_count\":2905,\"statuses_count\":1851,\"created_at\":\"Fri May 30 10:00:58 +0000 2014\",\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":true,\"lang\":\"th\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_tile\":false,\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg .com\\/profile_images\\/515551616275996672\\/G7zVgThg_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/515551616275996672\\/G7zVgThg_normal.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/2534725814\\/1411141386\",\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Mon Sep 22 15:05:10 +0000 2014\",\"id\":514067891071627265,\"id_str\":\"514067891071627265\",\"text\":\"\\u0e40\\u0e27\\u0e25\\u0e32\\u0e21\\u0e35\\u0e41\\u0e08\\u0e49\\u0e07\\u0e40\\u0e15\\u0e37\\u0e2d\\u0e19\\u0e44\\u0e25\\u0e19\\u0e4c\\u0e40\\u0e02\\u0e49\\u0e32\\u0e21\\u0e32 \\u0e40\\u0e23\\u0e32\\u0e01\\u0e47\\u0e2b\\u0e27\\u0e31\\u0e07\\u0e43\\u0e2b\\u0e49\\u0e40\\u0e1b\\u0e47\\u0e19\\u0e41\\u0e01\\u0e19\\u0e30\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.c om\\/download\\/iphone\\\" rel=\\\"nofollow\\\"\\u003eTwitter for iPhone\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":374499538,\"id_str\":\"374499538\",\"name\":\"rukdd\",\"screen_name\":\"rukdd\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"verified\":false,\"followers_count\":65917,\"friends_count\":19,\"listed_count\":39,\"favourites_count\":6,\"statuses_count\":11763,\"created_at\":\"Fri Sep 16 12:34:29 +0000 2011\",\"utc_offset\":-25200,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"lang\":\"th\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_background_images\\/622183294\\/njxkahnghgkbsv5cmg8x.jpeg\",\"profile_background_image_url_https\":\ "https:\\/\\/pbs.twimg.com\\/profile_background_images\\/622183294\\/njxkahnghgkbsv5cmg8x.jpeg\",\"profile_background_tile\":false,\"profile_link_color\":\"EB8DB3\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"E2F5DC\",\"profile_text_color\":\"6CAD0A\",\"profile_use_background_image\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/492971634190327808\\/xbxo82GM_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/492971634190327808\\/xbxo82GM_normal.jpeg\",\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":3298,\"favorite_count\":396,\"entities\":{\"hashtags\":[],\"trends\":[],\"urls\":[],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"low\",\"lang\": \"th\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"rukdd\",\"name\":\"rukdd\",\"id\":374499538,\"id_str\":\"374499538\",\"indices\":[3,9]}],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"th\",\"timestamp_ms\":\"1412178010716\"}" }; + + public static final String STREAMING_COUNTS_AS_TUPLES = "(Immigration:,1)\n" + "(Never,1)\n" + "(Report,1)\n" + + "(http://t.co/89DNq2IUFK,1)\n" + "(fans,1)\n" + "(celebrating,1)\n" + "(night,1)\n" + + "(http://t.co/XrRhhiQE0Aâ,1)\n" + "(have,1)\n" + "(deep,1)\n" + "(arts,1)\n" + "(Super-cute!,1)\n" + + "(#ARTP,1)\n" + "(Tired,1)\n" + "(#hungry,1)\n" + "(95,1)\n" + "(Fast,1)\n" + "(4,1)\n" + "(she,1)\n" + + "(want,1)\n" + "(just,1)\n" + "(us.,1)\n" + "(Brooks,1)\n" + "(drops,1)\n" + "(application,1)\n" + + "(Sun,1)\n" + "(Elizabeth,1)\n" + "(RT.,1)\n" + "(#Romance,1)\n" + "(Free,1)\n" + "(Movie,1)\n" + + "(Girl,1)\n" + "((@GoneGirlMovie),1)\n" + "(TX,1)\n" + "(Code),1)\n" + "(money,1)\n" + "(my,1)\n" + + "(dreams,1)\n" + "(money,2)\n" + "(Tired,2)\n" + "(am,2)\n" + "(just,2)\n" + "(my,2)\n" + "(me,2)\n" + + "(me,3)\n" + "(am,3)\n" + "(just,3)\n" + "(my,3)\n" + "(am,4)\n" + "(just,4)\n" + "(just,5)\n" + + "(my,5)\n" + "(just,6)\n" + "(just,7)\n" + "(70,1)\n" + "(Percent,1)\n" + "(Federal,1)\n" + + "(@BD_Lay,1)\n" + "(I,1)\n" + "(Knew,1)\n" + "(\uD83D\uDE02\uD83D\uDE2D,1)\n" + "(â@theawayfans:,1)\n" + "(Roma,1)\n" + + "(The,1)\n" + "(Etihad,1)\n" + "(in,1)\n" + "(the,1)\n" + "(therâ¦,1)\n" + "(a,1)\n" + "(the,2)\n" + + "(the,3)\n" + "(the,4)\n" + "(I,4)\n" + "(I,5)\n" + "(the,5)\n" + "(I,6)\n" + "(I,7)\n" + "(the,7)\n" + + "(the,8)\n" + "(the,9)\n" + "(the,10)\n" + "(the,11)\n" + "(the,12)\n" + "(the,13)\n" + "(the,14)\n" + + "(the,15)\n" + "(the,16)\n" + "(the,17)\n" + "(the,18)\n" + "(the,19)\n" + "(the,20)\n" + "(the,21)\n" + + "(the,22)\n" + "(the,23)\n" + "(RT,1)\n" + "(@jennybethm:,1)\n" + "(Illegal,1)\n" + "(Families,1)\n" + + "(Agents,1)\n" + "(RT,2)\n" + "(RT,3)\n" + "(RT,4)\n" + "(RT,5)\n" + "(RT,6)\n" + "(RT,7)\n" + "(RT,8)\n" + + "(RT,9)\n" + "(RT,10)\n" + "(RT,11)\n" + "(RT,12)\n" + "(RT,13)\n" + "(RT,14)\n" + "(RT,15)\n" + + "(RT,16)\n" + "(RT,17)\n" + "(RT,18)\n" + "(RT,19)\n" + "(RT,20)\n" + "(RT,21)\n" + "(of,1)\n" + + "(Released,1)\n" + "(Back,1)\n" + "(To,1)\n" + "(#teaparty,1)\n" + "(It,1)\n" + "(goal,1)\n" + "(at,1)\n" + + "(lad,1)\n" + "(hat,1)\n" + "(You,1)\n" + "(appreciation,1)\n" + "(of,2)\n" + "(At,2)\n" + "(of,3)\n" + + "(of,4)\n" + "(of,5)\n" + "(of,6)\n" + "(of,7)\n" + "(you,7)\n" + "(of,8)\n" + "(of,9)\n" + "(of,10)\n" + + "(of,11)\n" + "(of,12)\n" + "(of,13)\n" + "(of,14)"; + + private TwitterStreamData() { + } } http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index 5fa8689..dc5ce42 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -17,9 +17,6 @@ package org.apache.flink.streaming.examples.windowing; -import java.util.ArrayList; -import java.util.List; - import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -28,11 +25,20 @@ import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; import org.apache.flink.util.Collector; +import java.util.ArrayList; +import java.util.List; + public class SessionWindowing { @SuppressWarnings("serial") public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2); + + if (!parseParameters(args)) { + return; + } + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); final List<Tuple3<String, Long, Integer>> input = new ArrayList<Tuple3<String, Long, Integer>>(); @@ -57,9 +63,11 @@ public class SessionWindowing { // We sleep three seconds between every output so we // can see whether we properly detect sessions // before the next start for a specific id - Thread.sleep(3000); collector.collect(value); - System.out.println("Collected: " + value); + if (!fileOutput) { + System.out.println("Collected: " + value); + Thread.sleep(3000); + } } } @@ -69,10 +77,16 @@ public class SessionWindowing { }); // We create sessions for each id with max timeout of 3 time units - source.groupBy(0) + DataStream<Tuple3<String, Long, Integer>> aggregated = source.groupBy(0) .window(new SessionTriggerPolicy(3L), new TumblingEvictionPolicy<Tuple3<String, Long, Integer>>()).sum(2) - .flatten().print(); + .flatten(); + + if (fileOutput) { + aggregated.writeAsText(outputPath); + } else { + aggregated.print(); + } env.execute(); } @@ -115,7 +129,7 @@ public class SessionWindowing { // belongs to a different group if (timeSinceLastEvent > sessionTimeout) { - return new Object[] { datapoint }; + return new Object[]{datapoint}; } else { return null; } @@ -127,4 +141,27 @@ public class SessionWindowing { } } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + // parse input arguments + if (args.length == 1) { + fileOutput = true; + outputPath = args[0]; + } else { + System.err.println("Usage: SessionWindowing <result path>"); + return false; + } + } + return true; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java new file mode 100644 index 0000000..bb4a123 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing.util; + +public class SessionWindowingData { + + public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,5,3)\n" + + "(a,10,1)"; + + private SessionWindowingData() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java new file mode 100644 index 0000000..c390ec2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing.util; + +public class TopSpeedWindowingExampleData { + + public static final String CAR_DATA = + "(0,55,15.277777777777777,1424951918630)\n" + "(1,45,12.5,1424951918632)\n" + + "(0,50,29.166666666666664,1424951919632)\n" + "(1,50,26.38888888888889,1424951919632)\n" + + "(0,55,44.44444444444444,1424951920633)\n" + "(1,45,38.888888888888886,1424951920633)\n" + + "(0,50,58.33333333333333,1424951921634)\n" + "(1,40,50.0,1424951921634)\n" + + "(0,55,73.6111111111111,1424951922634)\n" + "(1,35,59.72222222222222,1424951922634)\n" + + "(0,60,90.27777777777777,1424951923634)\n" + "(1,40,70.83333333333333,1424951923634)\n" + + "(0,65,108.33333333333333,1424951924635)\n" + "(1,35,80.55555555555554,1424951924635)\n" + + "(0,60,125.0,1424951925635)\n" + "(1,40,91.66666666666666,1424951925635)\n" + + "(0,55,140.27777777777777,1424951926635)\n" + "(1,45,104.16666666666666,1424951926636)\n" + + "(0,60,156.94444444444443,1424951927636)\n" + "(1,50,118.05555555555554,1424951927636)\n" + + "(0,55,172.2222222222222,1424951928636)\n" + "(1,45,130.55555555555554,1424951928636)\n" + + "(0,50,186.1111111111111,1424951929636)\n" + "(1,50,144.44444444444443,1424951929637)\n" + + "(0,55,201.38888888888886,1424951930637)\n" + "(1,55,159.7222222222222,1424951930637)\n" + + "(0,60,218.05555555555551,1424951931637)\n" + "(1,60,176.38888888888886,1424951931637)\n" + + "(0,55,233.3333333333333,1424951932637)\n" + "(1,65,194.4444444444444,1424951932638)\n" + + "(0,50,247.22222222222217,1424951933638)\n" + "(1,70,213.88888888888886,1424951933638)\n" + + "(0,45,259.7222222222222,1424951934638)\n" + "(1,65,231.9444444444444,1424951934638)\n" + + "(0,50,273.6111111111111,1424951935638)\n" + "(1,70,251.38888888888886,1424951935639)\n" + + "(0,55,288.88888888888886,1424951936639)\n" + "(1,75,272.2222222222222,1424951936639)\n" + + "(0,50,302.77777777777777,1424951937639)\n" + "(1,70,291.66666666666663,1424951937639)\n" + + "(0,45,315.27777777777777,1424951938640)\n" + "(1,65,309.7222222222222,1424951938640)\n" + + "(0,50,329.1666666666667,1424951939640)\n" + "(1,70,329.16666666666663,1424951939640)\n" + + "(0,55,344.44444444444446,1424951940640)\n" + "(1,65,347.2222222222222,1424951940640)\n" + + "(0,50,358.33333333333337,1424951941641)\n" + "(1,70,366.66666666666663,1424951941641)\n" + + "(0,55,373.61111111111114,1424951942641)\n" + "(1,65,384.7222222222222,1424951942641)\n" + + "(0,50,387.50000000000006,1424951943641)\n" + "(1,70,404.16666666666663,1424951943641)\n" + + "(0,45,400.00000000000006,1424951944642)\n" + "(1,65,422.2222222222222,1424951944642)\n" + + "(0,50,413.88888888888897,1424951945642)\n" + "(1,60,438.88888888888886,1424951945642)\n" + + "(0,45,426.38888888888897,1424951946642)\n" + "(1,65,456.9444444444444,1424951946642)\n" + + "(0,40,437.50000000000006,1424951947643)\n" + "(1,70,476.38888888888886,1424951947643)\n" + + "(0,45,450.00000000000006,1424951948643)\n" + "(1,75,497.2222222222222,1424951948643)\n" + + "(0,40,461.11111111111114,1424951949643)\n" + "(1,80,519.4444444444443,1424951949644)\n" + + "(0,45,473.61111111111114,1424951950644)\n" + "(1,75,540.2777777777777,1424951950644)\n" + + "(0,50,487.50000000000006,1424951951644)\n" + "(1,80,562.4999999999999,1424951951644)\n" + + "(0,45,500.00000000000006,1424951952644)\n" + "(1,85,586.111111111111,1424951952645)\n" + + "(0,40,511.11111111111114,1424951953645)\n" + "(1,80,608.3333333333331,1424951953645)\n" + + "(0,35,520.8333333333334,1424951954645)\n" + "(1,75,629.1666666666665,1424951954645)\n" + + "(0,40,531.9444444444445,1424951955645)\n" + "(1,70,648.611111111111,1424951955646)\n" + + "(0,45,544.4444444444445,1424951956646)\n" + "(1,75,669.4444444444443,1424951956646)\n" + + "(0,50,558.3333333333334,1424951957646)\n" + "(1,80,691.6666666666665,1424951957646)\n" + + "(0,55,573.6111111111112,1424951958646)\n" + "(1,85,715.2777777777776,1424951958647)\n" + + "(0,60,590.2777777777778,1424951959647)\n" + "(1,80,737.4999999999998,1424951959647)\n" + + "(0,65,608.3333333333334,1424951960647)\n" + "(1,85,761.1111111111109,1424951960647)\n" + + "(0,70,627.7777777777778,1424951961647)\n" + "(1,80,783.333333333333,1424951961648)\n" + + "(0,75,648.6111111111112,1424951962648)\n" + "(1,85,806.9444444444441,1424951962648)\n" + + "(0,80,670.8333333333334,1424951963648)\n" + "(1,90,831.9444444444441,1424951963648)\n" + + "(0,75,691.6666666666667,1424951964649)\n" + "(1,95,858.333333333333,1424951964649)\n" + + "(0,70,711.1111111111112,1424951965649)\n" + "(1,90,883.333333333333,1424951965649)\n" + + "(0,75,731.9444444444446,1424951966649)\n" + "(1,95,909.722222222222,1424951966649)\n" + + "(0,70,751.388888888889,1424951967649)\n" + "(1,100,937.4999999999998,1424951967650)\n" + + "(0,75,772.2222222222224,1424951968650)\n" + "(1,100,965.2777777777776,1424951968650)\n" + + "(0,80,794.4444444444446,1424951969650)\n" + "(1,100,993.0555555555554,1424951969650)\n" + + "(0,75,815.2777777777779,1424951970651)\n" + "(1,100,1020.8333333333333,1424951970651)\n" + + "(0,80,837.5000000000001,1424951971651)\n" + "(1,100,1048.611111111111,1424951971651)\n" + + "(0,85,861.1111111111112,1424951972651)\n" + "(1,100,1076.388888888889,1424951972651)\n" + + "(0,80,883.3333333333334,1424951973652)\n" + "(1,95,1102.7777777777778,1424951973652)\n" + + "(0,75,904.1666666666667,1424951974652)\n" + "(1,100,1130.5555555555557,1424951974652)\n" + + "(0,70,923.6111111111112,1424951975652)\n" + "(1,100,1158.3333333333335,1424951975652)\n" + + "(0,75,944.4444444444446,1424951976653)\n" + "(1,100,1186.1111111111113,1424951976653)\n" + + "(0,80,966.6666666666667,1424951977653)\n" + "(1,95,1212.5000000000002,1424951977653)\n" + + "(0,75,987.5000000000001,1424951978653)\n" + "(1,100,1240.277777777778,1424951978653)\n" + + "(0,80,1009.7222222222223,1424951979654)\n" + "(1,100,1268.0555555555559,1424951979654)\n" + + "(0,85,1033.3333333333335,1424951980654)\n" + "(1,100,1295.8333333333337,1424951980654)\n" + + "(0,90,1058.3333333333335,1424951981654)\n" + "(1,100,1323.6111111111115,1424951981654)\n" + + "(0,85,1081.9444444444446,1424951982655)\n" + "(1,100,1351.3888888888894,1424951982655)\n" + + "(0,90,1106.9444444444446,1424951983655)\n" + "(1,100,1379.1666666666672,1424951983655)\n" + + "(0,95,1133.3333333333335,1424951984655)\n" + "(1,100,1406.944444444445,1424951984656)\n" + + "(0,90,1158.3333333333335,1424951985656)\n" + "(1,95,1433.333333333334,1424951985656)\n" + + "(0,95,1184.7222222222224,1424951986656)\n" + "(1,90,1458.333333333334,1424951986656)\n" + + "(0,90,1209.7222222222224,1424951987656)\n" + "(1,95,1484.7222222222229,1424951987657)\n" + + "(0,85,1233.3333333333335,1424951988657)\n" + "(1,90,1509.7222222222229,1424951988657)\n" + + "(0,80,1255.5555555555557,1424951989657)\n" + "(1,95,1536.1111111111118,1424951989657)\n" + + "(0,85,1279.1666666666667,1424951990657)\n" + "(1,100,1563.8888888888896,1424951990658)\n" + + "(0,90,1304.1666666666667,1424951991658)\n" + "(1,95,1590.2777777777785,1424951991658)\n" + + "(0,95,1330.5555555555557,1424951992658)\n" + "(1,90,1615.2777777777785,1424951992658)\n" + + "(0,100,1358.3333333333335,1424951993659)\n" + "(1,95,1641.6666666666674,1424951993659)\n" + + "(0,100,1386.1111111111113,1424951994659)\n" + "(1,100,1669.4444444444453,1424951994659)\n" + + "(0,95,1412.5000000000002,1424951995659)\n" + "(1,95,1695.8333333333342,1424951995660)\n" + + "(0,100,1440.277777777778,1424951996660)\n" + "(1,90,1720.8333333333342,1424951996660)\n" + + "(0,100,1468.0555555555559,1424951997660)\n" + "(1,85,1744.4444444444453,1424951997660)\n" + + "(0,95,1494.4444444444448,1424951998660)\n" + "(1,80,1766.6666666666674,1424951998661)\n" + + "(0,100,1522.2222222222226,1424951999661)\n" + "(1,75,1787.5000000000007,1424951999661)\n" + + "(0,95,1548.6111111111115,1424952000661)\n" + "(1,80,1809.7222222222229,1424952000661)\n" + + "(0,90,1573.6111111111115,1424952001662)\n" + "(1,75,1830.555555555556,1424952001662)\n" + + "(0,95,1600.0000000000005,1424952002662)\n" + "(1,80,1852.7777777777783,1424952002662)\n" + + "(0,100,1627.7777777777783,1424952003662)\n" + "(1,85,1876.3888888888894,1424952003662)\n" + + "(0,100,1655.555555555556,1424952004663)\n" + "(1,80,1898.6111111111115,1424952004663)\n" + + "(0,95,1681.944444444445,1424952005663)\n" + "(1,85,1922.2222222222226,1424952005663)\n" + + "(0,100,1709.7222222222229,1424952006663)\n" + "(1,90,1947.2222222222226,1424952006664)\n" + + "(0,100,1737.5000000000007,1424952007664)\n" + "(1,95,1973.6111111111115,1424952007664)\n" + + "(0,95,1763.8888888888896,1424952008664)\n" + "(1,90,1998.6111111111115,1424952008664)\n" + + "(0,100,1791.6666666666674,1424952009664)\n" + "(1,85,2022.2222222222226,1424952009665)\n" + + "(0,95,1818.0555555555563,1424952010665)\n" + "(1,80,2044.4444444444448,1424952010665)\n" + + "(0,90,1843.0555555555563,1424952011665)\n" + "(1,75,2065.2777777777783,1424952011665)\n" + + "(0,95,1869.4444444444453,1424952012666)\n" + "(1,80,2087.5000000000005,1424952012666)\n" + + "(0,100,1897.222222222223,1424952013666)\n" + "(1,85,2111.1111111111118,1424952013666)\n" + + "(0,95,1923.611111111112,1424952014666)\n" + "(1,90,2136.1111111111118,1424952014666)\n" + + "(0,100,1951.3888888888898,1424952015667)\n" + "(1,85,2159.722222222223,1424952015667)\n" + + "(0,95,1977.7777777777787,1424952016667)\n" + "(1,90,2184.722222222223,1424952016667)\n" + + "(0,100,2005.5555555555566,1424952017667)\n" + "(1,95,2211.1111111111118,1424952017668)"; + + public static final String TOP_SPEEDS = + "(0,55,44.44444444444444,1424951920633)\n" + "(1,40,50.0,1424951921634)\n" + + "(0,65,108.33333333333333,1424951924635)\n" + "(1,45,104.16666666666666,1424951926636)\n" + + "(0,55,172.2222222222222,1424951928636)\n" + "(1,55,159.7222222222222,1424951930637)\n" + + "(0,55,233.3333333333333,1424951932637)\n" + "(1,70,213.88888888888886,1424951933638)\n" + + "(0,55,288.88888888888886,1424951936639)\n" + "(1,75,272.2222222222222,1424951936639)\n" + + "(1,70,329.16666666666663,1424951939640)\n" + "(0,55,344.44444444444446,1424951940640)\n" + + "(1,65,384.7222222222222,1424951942641)\n" + "(0,45,400.00000000000006,1424951944642)\n" + + "(1,60,438.88888888888886,1424951945642)\n" + "(1,75,497.2222222222222,1424951948643)\n" + + "(0,40,461.11111111111114,1424951949643)\n" + "(1,80,562.4999999999999,1424951951644)\n" + + "(0,35,520.8333333333334,1424951954645)\n" + "(1,75,629.1666666666665,1424951954645)\n" + + "(1,80,691.6666666666665,1424951957646)\n" + "(0,55,573.6111111111112,1424951958646)\n" + + "(1,85,761.1111111111109,1424951960647)\n" + "(0,70,627.7777777777778,1424951961647)\n" + + "(1,90,831.9444444444441,1424951963648)\n" + "(0,75,691.6666666666667,1424951964649)\n" + + "(1,90,883.333333333333,1424951965649)\n" + "(0,70,751.388888888889,1424951967649)\n" + + "(1,100,937.4999999999998,1424951967650)\n" + "(1,100,993.0555555555554,1424951969650)\n" + + "(0,75,815.2777777777779,1424951970651)\n" + "(1,100,1048.611111111111,1424951971651)\n" + + "(0,80,883.3333333333334,1424951973652)\n" + "(1,95,1102.7777777777778,1424951973652)\n" + + "(1,100,1158.3333333333335,1424951975652)\n" + "(0,75,944.4444444444446,1424951976653)\n" + + "(1,95,1212.5000000000002,1424951977653)\n" + "(0,80,1009.7222222222223,1424951979654)\n" + + "(1,100,1268.0555555555559,1424951979654)\n" + "(1,100,1323.6111111111115,1424951981654)\n" + + "(0,85,1081.9444444444446,1424951982655)\n" + "(1,100,1379.1666666666672,1424951983655)\n" + + "(0,95,1133.3333333333335,1424951984655)\n" + "(1,95,1433.333333333334,1424951985656)\n" + + "(0,95,1184.7222222222224,1424951986656)\n" + "(1,95,1484.7222222222229,1424951987657)\n" + + "(0,80,1255.5555555555557,1424951989657)\n" + "(1,95,1536.1111111111118,1424951989657)\n" + + "(0,90,1304.1666666666667,1424951991658)\n" + "(1,95,1590.2777777777785,1424951991658)\n" + + "(0,100,1358.3333333333335,1424951993659)\n" + "(1,95,1641.6666666666674,1424951993659)\n" + + "(0,95,1412.5000000000002,1424951995659)\n" + "(1,95,1695.8333333333342,1424951995660)\n" + + "(0,100,1468.0555555555559,1424951997660)\n" + "(1,80,1766.6666666666674,1424951998661)\n" + + "(0,100,1522.2222222222226,1424951999661)\n" + "(0,90,1573.6111111111115,1424952001662)\n" + + "(1,75,1830.555555555556,1424952001662)\n" + "(0,100,1627.7777777777783,1424952003662)\n" + + "(1,80,1898.6111111111115,1424952004663)\n" + "(0,95,1681.944444444445,1424952005663)\n" + + "(1,90,1947.2222222222226,1424952006664)\n" + "(0,100,1737.5000000000007,1424952007664)\n" + + "(0,100,1791.6666666666674,1424952009664)\n" + "(1,85,2022.2222222222226,1424952009665)\n" + + "(0,90,1843.0555555555563,1424952011665)\n" + "(1,80,2087.5000000000005,1424952012666)\n" + + "(0,100,1897.222222222223,1424952013666)\n" + "(0,100,1951.3888888888898,1424952015667)\n" + + "(1,85,2159.722222222223,1424952015667)\n" + "(0,100,2005.5555555555566,1424952017667)\n" + + "(1,95,2211.1111111111118,1424952017668)"; + + private TopSpeedWindowingExampleData() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/iteration/IterateExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/iteration/IterateExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/iteration/IterateExampleITCase.java new file mode 100644 index 0000000..7c971be --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/iteration/IterateExampleITCase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.iteration; + +import org.apache.flink.streaming.examples.iteration.IterateExample; +import org.apache.flink.streaming.examples.iteration.util.IterateExampleData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class IterateExampleITCase extends StreamingProgramTestBase { + + + protected String inputPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(IterateExampleData.RESULTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + IterateExample.main(new String[]{inputPath, resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java new file mode 100644 index 0000000..ddab597 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.join; + +import org.apache.flink.streaming.examples.join.WindowJoin; +import org.apache.flink.streaming.examples.join.util.WindowJoinData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class WindowJoinITCase extends StreamingProgramTestBase { + + protected String gradesPath; + protected String salariesPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + setParallelism(1); + gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT); + salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WindowJoinData.WINDOW_JOIN_RESULTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/ml/IncrementalLearningSkeletonITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/ml/IncrementalLearningSkeletonITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/ml/IncrementalLearningSkeletonITCase.java new file mode 100644 index 0000000..d10aacd --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/ml/IncrementalLearningSkeletonITCase.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.ml; + +import org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton; +import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class IncrementalLearningSkeletonITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + IncrementalLearningSkeleton.main(new String[]{resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java new file mode 100644 index 0000000..b16a85f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.socket; + +import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; +import org.junit.Assert; + +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; + +public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase { + + private static final String HOST = "localhost"; + private static final String PORT = "9999"; + protected String resultPath; + + private ServerSocket temporarySocket; + + @Override + protected void preSubmit() throws Exception { + temporarySocket = createSocket(HOST, Integer.valueOf(PORT), WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + temporarySocket.close(); + } + + @Override + protected void testProgram() throws Exception { + SocketTextStreamWordCount.main(new String[]{HOST, PORT, resultPath}); + } + + public ServerSocket createSocket(String host, int port, String contents) throws Exception { + ServerSocket serverSocket = new ServerSocket(port); + ServerThread st = new ServerThread(serverSocket, contents); + st.start(); + return serverSocket; + } + + private static class ServerThread extends Thread { + + private ServerSocket serverSocket; + private String contents; + private Thread t; + + public ServerThread(ServerSocket serverSocket, String contents) { + this.serverSocket = serverSocket; + this.contents = contents; + t = new Thread(this); + } + + public void waitForAccept() throws Exception { + Socket socket = serverSocket.accept(); + PrintWriter writer = new PrintWriter(socket.getOutputStream(), true); + writer.println(contents); + writer.close(); + socket.close(); + } + + public void run() { + try { + waitForAccept(); + } catch (Exception e) { + Assert.fail(); + } + } + + @Override + public void start() { + t.start(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/twitter/TwitterStreamITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/twitter/TwitterStreamITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/twitter/TwitterStreamITCase.java new file mode 100644 index 0000000..1dc5eb5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/twitter/TwitterStreamITCase.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.twitter; + +import org.apache.flink.streaming.examples.twitter.TwitterStream; +import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class TwitterStreamITCase extends StreamingProgramTestBase { + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(TwitterStreamData.STREAMING_COUNTS_AS_TUPLES, resultPath); + } + + @Override + protected void testProgram() throws Exception { + TwitterStream.main(new String[]{resultPath}); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java new file mode 100644 index 0000000..5e332f1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.windowing; + +import org.apache.flink.streaming.examples.windowing.SessionWindowing; +import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class SessionWindowingITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + setParallelism(2); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath); + } + + @Override + protected void testProgram() throws Exception { + SessionWindowing.main(new String[]{resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java new file mode 100644 index 0000000..f973bd1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.windowing; + +import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData; +import org.apache.flink.streaming.examples.windowing.TopSpeedWindowingExample; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase { + protected String textPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + setParallelism(1); + textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + TopSpeedWindowingExample.main(new String[]{textPath, resultPath}); + + } +}