Repository: flink Updated Branches: refs/heads/master 36fcdae58 -> 46573a6ae
[streaming] ITCase for WindowWordCount Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71b2d664 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71b2d664 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71b2d664 Branch: refs/heads/master Commit: 71b2d664e0f98d9f5e9af8c84d2e749d4cece273 Parents: 36fcdae Author: mbalassi <[email protected]> Authored: Thu Apr 9 12:35:52 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Fri Apr 10 18:14:18 2015 +0200 ---------------------------------------------------------------------- .../examples/windowing/WindowWordCount.java | 20 ++++++-- .../examples/test/join/WindowJoinITCase.java | 1 - .../test/windowing/WindowWordCountITCase.java | 50 ++++++++++++++++++++ 3 files changed, 65 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java index cef760f..bd3acc6 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java @@ -46,6 +46,10 @@ import org.apache.flink.streaming.examples.wordcount.WordCount; */ public class WindowWordCount { + // window parameters with default values + private static int windowSize = 250; + private static int slideSize = 150; + // ************************************************************************* // PROGRAM // ************************************************************************* @@ -65,8 +69,8 @@ public class WindowWordCount { DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new WordCount.Tokenizer()) - // create windows of 250 records slided every 150 records - .window(Count.of(250)).every(Count.of(150)) + // create windows of windowSize records slided every slideSize records + .window(Count.of(windowSize)).every(Count.of(slideSize)) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0).sum(1) // flatten the windows to a single stream @@ -97,17 +101,23 @@ public class WindowWordCount { if (args.length > 0) { // parse input arguments fileOutput = true; - if (args.length == 2) { + if (args.length >= 2 && args.length <= 4) { textPath = args[0]; outputPath = args[1]; + if (args.length >= 3){ + windowSize = Integer.parseInt(args[2]); + + // if no slide size is specified use the + slideSize = args.length == 3 ? windowSize : Integer.parseInt(args[2]); + } } else { - System.err.println("Usage: WindowWordCount <text path> <result path>"); + System.err.println("Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]"); return false; } } else { System.out.println("Executing WindowWordCount example with built-in default data."); System.out.println(" Provide parameters to read input data from a file."); - System.out.println(" Usage: WindowWordCount <text path> <result path>"); + System.out.println(" Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]"); } return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java index 0c1fb39..a1bef5c 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java @@ -30,7 +30,6 @@ public class WindowJoinITCase extends StreamingProgramTestBase { @Override protected void preSubmit() throws Exception { - setParallelism(1); gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT); salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT); resultPath = getTempDirPath("result"); http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java new file mode 100644 index 0000000..6fdd4ef --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.test.windowing; + +import org.apache.flink.streaming.examples.windowing.WindowWordCount; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; + +public class WindowWordCountITCase extends StreamingProgramTestBase { + + protected String textPath; + protected String resultPath; + protected String windowSize = "250"; + protected String slideSize = "150"; + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + // since the parallel tokenizers might have different speed + // the exact output can not be checked just whether it is well-formed + // checks that the result lines look like e.g. (faust, 2) + checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)"); + } + + @Override + protected void testProgram() throws Exception { + WindowWordCount.main(new String[]{textPath, resultPath, windowSize, slideSize}); + } +}
