Repository: flink
Updated Branches:
  refs/heads/master 36fcdae58 -> 46573a6ae


[streaming] ITCase for WindowWordCount


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71b2d664
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71b2d664
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71b2d664

Branch: refs/heads/master
Commit: 71b2d664e0f98d9f5e9af8c84d2e749d4cece273
Parents: 36fcdae
Author: mbalassi <[email protected]>
Authored: Thu Apr 9 12:35:52 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Fri Apr 10 18:14:18 2015 +0200

----------------------------------------------------------------------
 .../examples/windowing/WindowWordCount.java     | 20 ++++++--
 .../examples/test/join/WindowJoinITCase.java    |  1 -
 .../test/windowing/WindowWordCountITCase.java   | 50 ++++++++++++++++++++
 3 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index cef760f..bd3acc6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -46,6 +46,10 @@ import 
org.apache.flink.streaming.examples.wordcount.WordCount;
  */
 public class WindowWordCount {
 
+       // window parameters with default values
+       private static int windowSize = 250;
+       private static int slideSize = 150;
+
        // 
*************************************************************************
        // PROGRAM
        // 
*************************************************************************
@@ -65,8 +69,8 @@ public class WindowWordCount {
                DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new WordCount.Tokenizer())
-                               // create windows of 250 records slided every 
150 records
-                               .window(Count.of(250)).every(Count.of(150))
+                               // create windows of windowSize records slided 
every slideSize records
+                               
.window(Count.of(windowSize)).every(Count.of(slideSize))
                                // group by the tuple field "0" and sum up 
tuple field "1"
                                .groupBy(0).sum(1)
                                // flatten the windows to a single stream
@@ -97,17 +101,23 @@ public class WindowWordCount {
                if (args.length > 0) {
                        // parse input arguments
                        fileOutput = true;
-                       if (args.length == 2) {
+                       if (args.length >= 2 && args.length <= 4) {
                                textPath = args[0];
                                outputPath = args[1];
+                               if (args.length >= 3){
+                                       windowSize = Integer.parseInt(args[2]);
+
+                                       // if no slide size is specified use the
+                                       slideSize = args.length == 3 ? 
windowSize : Integer.parseInt(args[2]);
+                               }
                        } else {
-                               System.err.println("Usage: WindowWordCount 
<text path> <result path>");
+                               System.err.println("Usage: WindowWordCount 
<text path> <result path> [<window size>] [<slide size>]");
                                return false;
                        }
                } else {
                        System.out.println("Executing WindowWordCount example 
with built-in default data.");
                        System.out.println("  Provide parameters to read input 
data from a file.");
-                       System.out.println("  Usage: WindowWordCount <text 
path> <result path>");
+                       System.out.println("  Usage: WindowWordCount <text 
path> <result path> [<window size>] [<slide size>]");
                }
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
index 0c1fb39..a1bef5c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
@@ -30,7 +30,6 @@ public class WindowJoinITCase extends 
StreamingProgramTestBase {
 
        @Override
        protected void preSubmit() throws Exception {
-               setParallelism(1);
                gradesPath = createTempFile("gradesText.txt", 
WindowJoinData.GRADES_INPUT);
                salariesPath = createTempFile("salariesText.txt", 
WindowJoinData.SALARIES_INPUT);
                resultPath = getTempDirPath("result");

http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
new file mode 100644
index 0000000..6fdd4ef
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.windowing;
+
+import org.apache.flink.streaming.examples.windowing.WindowWordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WindowWordCountITCase extends StreamingProgramTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+       protected String windowSize = "250";
+       protected String slideSize = "150";
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               // since the parallel tokenizers might have different speed
+               // the exact output can not be checked just whether it is 
well-formed
+               // checks that the result lines look like e.g. (faust, 2)
+               checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               WindowWordCount.main(new String[]{textPath, resultPath, 
windowSize, slideSize});
+       }
+}

Reply via email to