[streaming] [scala] scala SocketTextStream added and minor fixes Organized imports for streaming scala examples Added template parameter for scala streaming iterate Minor fixes in streaming examples
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b22406a6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b22406a6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b22406a6 Branch: refs/heads/master Commit: b22406a6e27a9528452d62602fe25c416633467b Parents: 19066b5 Author: mbalassi <[email protected]> Authored: Tue Jan 6 22:50:18 2015 +0100 Committer: mbalassi <[email protected]> Committed: Thu Jan 8 13:35:28 2015 +0100 ---------------------------------------------------------------------- .../socket/SocketTextStreamWordCount.java | 59 +++++++------ .../socket/SocketTextStreamWordCount.scala | 91 ++++++++++++++++++++ .../examples/windowing/TopSpeedWindowing.scala | 5 +- .../scala/examples/windowing/WindowJoin.scala | 11 +-- .../flink/streaming/api/scala/DataStream.scala | 4 +- 5 files changed, 134 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java index ec32e9f..e9b60f4 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java @@ -23,11 +23,21 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer; /** - * This example shows an implementation of WordCount with data from socket. - * + * This example shows an implementation of WordCount with data from a text + * socket. To run the example make sure that the service providing the text data + * is already up and running. + * * <p> - * Usage: <code>SocketTextStreamWordCount <hostname> <port > <result path></code><br> - * + * To start an example socket text stream on your local machine run netcat from + * a command line: <code>nc -lk 9999</code>, where the parameter specifies the + * port number. + * + * + * <p> + * Usage: + * <code>SocketTextStreamWordCount <hostname> <port> <result path></code> + * <br> + * * <p> * This example shows how to: * <ul> @@ -35,6 +45,8 @@ import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer; * <li>write a simple Flink program, * <li>write and use user-defined functions. * </ul> + * + * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a> */ public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { @@ -44,16 +56,18 @@ public class SocketTextStreamWordCount { } // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(); // get input data - DataStream<String> text = env.socketTextStream(hostname, port); + DataStream<String> text = env.socketTextStream(hostName, port); DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0).sum(1); + .groupBy(0) + .sum(1); if (fileOutput) { counts.writeAsText(outputPath, 1); @@ -62,7 +76,7 @@ public class SocketTextStreamWordCount { } // execute program - env.execute("WordCount with SocketTextStream Example"); + env.execute("WordCount from SocketTextStream Example"); } // ************************************************************************* @@ -70,30 +84,23 @@ public class SocketTextStreamWordCount { // ************************************************************************* private static boolean fileOutput = false; - private static String hostname; + private static String hostName; private static int port; private static String outputPath; private static boolean parseParameters(String[] args) { - if (args.length > 0) { - // parse input arguments - if (args.length == 3) { - fileOutput = true; - hostname = args[0]; - port = Integer.valueOf(args[1]); - outputPath = args[2]; - } else if (args.length == 2) { - hostname = args[0]; - port = Integer.valueOf(args[1]); - } else { - System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> <output path>"); - return false; - } + // parse input arguments + if (args.length == 3) { + fileOutput = true; + hostName = args[0]; + port = Integer.valueOf(args[1]); + outputPath = args[2]; + } else if (args.length == 2) { + hostName = args[0]; + port = Integer.valueOf(args[1]); } else { - System.out.println("Executing WordCount example with data from socket."); - System.out.println(" Provide parameters to connect data source."); - System.out.println(" Usage: SocketTextStreamWordCount <hostname> <port> <output path>"); + System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]"); return false; } return true; http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala new file mode 100644 index 0000000..b38764c --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.socket + +import org.apache.flink.streaming.api.scala._ + +/** + * This example shows an implementation of WordCount with data from a text socket. + * To run the example make sure that the service providing the text data is already up and running. + * + * To start an example socket text stream on your local machine run netcat from a command line, + * where the parameter specifies the port number: + * + * {{{ + * nc -lk 9999 + * }}} + * + * Usage: + * {{{ + * SocketTextStreamWordCount <hostname> <port> <output path> + * }}} + * + * This example shows how to: + * + * - use StreamExecutionEnvironment.socketTextStream + * - write a simple Flink Streaming program in scala. + * - write and use user-defined functions. + */ +object SocketTextStreamWordCount { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.socketTextStream(hostName, port) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + if (fileOutput) { + counts.writeAsCsv(outputPath, 1) + } else { + counts print + } + + env.execute("Scala SocketTextStreamWordCount Example") + } + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 3) { + fileOutput = true + hostName = args(0) + port = args(1).toInt + outputPath = args(2) + } else if (args.length == 2) { + hostName = args(0) + port = args(1).toInt + } else { + System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]") + return false + } + true + } + + private var fileOutput: Boolean = false + private var hostName: String = null + private var port: Int = 0 + private var outputPath: String = null + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index a43f479..e3ef95e 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -18,12 +18,11 @@ package org.apache.flink.streaming.scala.examples.windowing - import java.util.concurrent.TimeUnit._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.windowing.{Delta, Time} -import org.apache.flink.api.scala._ + import scala.Stream._ import scala.math._ import scala.util.Random http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala index d6c0363..0b78365 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala @@ -18,8 +18,7 @@ package org.apache.flink.streaming.scala.examples.windowing -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala._ import scala.Stream._ import scala.util.Random @@ -39,11 +38,13 @@ object WindowJoin { val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2)) val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2)) - //Join the two input streams by id on the last second every 2 seconds and create new + //Join the two input streams by id on the last 2 seconds every second and create new //Person objects containing both name and age val joined = - names.join(ages).onWindow(1, TimeUnit.SECONDS).every(2, TimeUnit.SECONDS) - .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) } + names.join(ages).onWindow(2, TimeUnit.SECONDS) + .every(1, TimeUnit.SECONDS) + .where("id") + .equalTo("id") { (n, a) => Person(n.name, a.age) } joined print http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 6d94de7..ffe91cb 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -214,8 +214,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * * */ - def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: - Long = 0): DataStream[T] = { + def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), + maxWaitTimeMillis:Long = 0): DataStream[R] = { val iterativeStream = javaStream.iterate(maxWaitTimeMillis) val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
