[FLINK-2139] [streaming] Streaming outputformat tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f72e5c8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f72e5c8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f72e5c8c

Branch: refs/heads/master
Commit: f72e5c8cd781b2d32aa89fd62f3b0b8c78ded0f8
Parents: 2298cfe
Author: mbalassi <mbala...@apache.org>
Authored: Thu Jun 4 11:04:46 2015 +0200
Committer: mbalassi <mbala...@apache.org>
Committed: Sat Jun 6 13:56:54 2015 +0200

----------------------------------------------------------------------
 .../flink/api/avro/AvroOutputFormatITCase.java  | 173 +++++++++++++++++++
 .../flink/api/avro/AvroOutputFormatTest.java    | 173 -------------------
 .../api/functions/sink/SocketClientSink.java    |   2 +
 .../api/outputformat/CsvOutputFormatITCase.java |  78 +++++++++
 .../outputformat/SocketOutputFormatITCase.java  |  52 ++++++
 .../outputformat/TextOutputFormatITCase.java    |  55 ++++++
 .../streaming/util/SocketOutputTestBase.java    | 129 ++++++++++++++
 .../streaming/util/SocketProgramITCaseBase.java |   2 +-
 .../socket/SocketTextStreamWordCountITCase.java |   2 +-
 .../socket/SocketTextStreamWordCountITCase.java |   2 +-
 .../flink-streaming-scala/pom.xml               |  20 ++-
 .../scala/api/CsvOutputFormatITCase.java        |  67 +++++++
 .../scala/api/SocketOutputFormatITCase.java     |  36 ++++
 .../scala/api/TextOutputFormatITCase.java       |  43 +++++
 .../api/scala/OutputFormatTestPrograms.scala    |  76 ++++++++
 15 files changed, 733 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
new file mode 100644
index 0000000..d40fec5
--- /dev/null
+++ 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class AvroOutputFormatITCase extends JavaProgramTestBase {
+
+       public static String outputPath1;
+
+       public static String outputPath2;
+
+       public static String inputPath;
+
+       public static String userData = "alice|1|blue\n" +
+               "bob|2|red\n" +
+               "john|3|yellow\n" +
+               "walt|4|black\n";
+
+       @Override
+       protected void preSubmit() throws Exception {
+               inputPath = createTempFile("user", userData);
+               outputPath1 = getTempDirPath("avro_output1");
+               outputPath2 = getTempDirPath("avro_output2");
+       }
+
+
+       @Override
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<String, Integer, String>> input = 
env.readCsvFile(inputPath)
+                       .fieldDelimiter("|")
+                       .types(String.class, Integer.class, String.class);
+
+               //output the data with AvroOutputFormat for specific user type
+               DataSet<User> specificUser = input.map(new ConvertToUser());
+               specificUser.write(new AvroOutputFormat<User>(User.class), 
outputPath1);
+
+               //output the data with AvroOutputFormat for reflect user type
+               DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new 
ConvertToReflective());
+               reflectiveUser.write(new 
AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+
+               env.execute();
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               //compare result for specific user type
+               File [] output1;
+               File file1 = asFile(outputPath1);
+               if (file1.isDirectory()) {
+                       output1 = file1.listFiles();
+                       // check for avro ext in dir.
+                       for (File avroOutput : output1) {
+                               Assert.assertTrue("Expect extension '.avro'", 
avroOutput.toString().endsWith(".avro"));
+                       }
+               } else {
+                       output1 = new File[] {file1};
+               }
+               List<String> result1 = new ArrayList<String>();
+               DatumReader<User> userDatumReader1 = new 
SpecificDatumReader<User>(User.class);
+               for (File avroOutput : output1) {
+
+                       DataFileReader<User> dataFileReader1 = new 
DataFileReader<User>(avroOutput, userDatumReader1);
+                       while (dataFileReader1.hasNext()) {
+                               User user = dataFileReader1.next();
+                               result1.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+                       }
+               }
+               for (String expectedResult : userData.split("\n")) {
+                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result1.contains(expectedResult));
+               }
+
+               //compare result for reflect user type
+               File [] output2;
+               File file2 = asFile(outputPath2);
+               if (file2.isDirectory()) {
+                       output2 = file2.listFiles();
+               } else {
+                       output2 = new File[] {file2};
+               }
+               List<String> result2 = new ArrayList<String>();
+               DatumReader<ReflectiveUser> userDatumReader2 = new 
ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
+               for (File avroOutput : output2) {
+                       DataFileReader<ReflectiveUser> dataFileReader2 = new 
DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+                       while (dataFileReader2.hasNext()) {
+                               ReflectiveUser user = dataFileReader2.next();
+                               result2.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+                       }
+               }
+               for (String expectedResult : userData.split("\n")) {
+                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result2.contains(expectedResult));
+               }
+
+
+       }
+
+
+       public final static class ConvertToUser extends 
RichMapFunction<Tuple3<String, Integer, String>, User> {
+
+               @Override
+               public User map(Tuple3<String, Integer, String> value) throws 
Exception {
+                       return new User(value.f0, value.f1, value.f2);
+               }
+       }
+
+       public final static class ConvertToReflective extends 
RichMapFunction<User, ReflectiveUser> {
+
+               @Override
+               public ReflectiveUser map(User value) throws Exception {
+                       return new ReflectiveUser(value.getName().toString(), 
value.getFavoriteNumber(), value.getFavoriteColor().toString());
+               }
+       }
+
+       
+       public static class ReflectiveUser {
+               private String name;
+               private int favoriteNumber;
+               private String favoriteColor;
+
+               public ReflectiveUser() {}
+
+               public ReflectiveUser(String name, int favoriteNumber, String 
favoriteColor) {
+                       this.name = name;
+                       this.favoriteNumber = favoriteNumber;
+                       this.favoriteColor = favoriteColor;
+               }
+               
+               public String getName() {
+                       return this.name;
+               }
+               public String getFavoriteColor() {
+                       return this.favoriteColor;
+               }
+               public int getFavoriteNumber() {
+                       return this.favoriteNumber;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
deleted file mode 100644
index a8bace3..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class AvroOutputFormatTest extends JavaProgramTestBase {
-
-       public static String outputPath1;
-
-       public static String outputPath2;
-
-       public static String inputPath;
-
-       public static String userData = "alice|1|blue\n" +
-               "bob|2|red\n" +
-               "john|3|yellow\n" +
-               "walt|4|black\n";
-
-       @Override
-       protected void preSubmit() throws Exception {
-               inputPath = createTempFile("user", userData);
-               outputPath1 = getTempDirPath("avro_output1");
-               outputPath2 = getTempDirPath("avro_output2");
-       }
-
-
-       @Override
-       protected void testProgram() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple3<String, Integer, String>> input = 
env.readCsvFile(inputPath)
-                       .fieldDelimiter("|")
-                       .types(String.class, Integer.class, String.class);
-
-               //output the data with AvroOutputFormat for specific user type
-               DataSet<User> specificUser = input.map(new ConvertToUser());
-               specificUser.write(new AvroOutputFormat<User>(User.class), 
outputPath1);
-
-               //output the data with AvroOutputFormat for reflect user type
-               DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new 
ConvertToReflective());
-               reflectiveUser.write(new 
AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               //compare result for specific user type
-               File [] output1;
-               File file1 = asFile(outputPath1);
-               if (file1.isDirectory()) {
-                       output1 = file1.listFiles();
-                       // check for avro ext in dir.
-                       for (File avroOutput : output1) {
-                               Assert.assertTrue("Expect extension '.avro'", 
avroOutput.toString().endsWith(".avro"));
-                       }
-               } else {
-                       output1 = new File[] {file1};
-               }
-               List<String> result1 = new ArrayList<String>();
-               DatumReader<User> userDatumReader1 = new 
SpecificDatumReader<User>(User.class);
-               for (File avroOutput : output1) {
-
-                       DataFileReader<User> dataFileReader1 = new 
DataFileReader<User>(avroOutput, userDatumReader1);
-                       while (dataFileReader1.hasNext()) {
-                               User user = dataFileReader1.next();
-                               result1.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-                       }
-               }
-               for (String expectedResult : userData.split("\n")) {
-                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result1.contains(expectedResult));
-               }
-
-               //compare result for reflect user type
-               File [] output2;
-               File file2 = asFile(outputPath2);
-               if (file2.isDirectory()) {
-                       output2 = file2.listFiles();
-               } else {
-                       output2 = new File[] {file2};
-               }
-               List<String> result2 = new ArrayList<String>();
-               DatumReader<ReflectiveUser> userDatumReader2 = new 
ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
-               for (File avroOutput : output2) {
-                       DataFileReader<ReflectiveUser> dataFileReader2 = new 
DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
-                       while (dataFileReader2.hasNext()) {
-                               ReflectiveUser user = dataFileReader2.next();
-                               result2.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-                       }
-               }
-               for (String expectedResult : userData.split("\n")) {
-                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result2.contains(expectedResult));
-               }
-
-
-       }
-
-
-       public final static class ConvertToUser extends 
RichMapFunction<Tuple3<String, Integer, String>, User> {
-
-               @Override
-               public User map(Tuple3<String, Integer, String> value) throws 
Exception {
-                       return new User(value.f0, value.f1, value.f2);
-               }
-       }
-
-       public final static class ConvertToReflective extends 
RichMapFunction<User, ReflectiveUser> {
-
-               @Override
-               public ReflectiveUser map(User value) throws Exception {
-                       return new ReflectiveUser(value.getName().toString(), 
value.getFavoriteNumber(), value.getFavoriteColor().toString());
-               }
-       }
-
-       
-       public static class ReflectiveUser {
-               private String name;
-               private int favoriteNumber;
-               private String favoriteColor;
-
-               public ReflectiveUser() {}
-
-               public ReflectiveUser(String name, int favoriteNumber, String 
favoriteColor) {
-                       this.name = name;
-                       this.favoriteNumber = favoriteNumber;
-                       this.favoriteColor = favoriteColor;
-               }
-               
-               public String getName() {
-                       return this.name;
-               }
-               public String getFavoriteColor() {
-                       return this.favoriteColor;
-               }
-               public int getFavoriteNumber() {
-                       return this.favoriteNumber;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index cd6c21c..3fd2678 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -85,6 +85,8 @@ public class SocketClientSink<IN> extends 
RichSinkFunction<IN> {
                        if(LOG.isErrorEnabled()){
                                LOG.error("Cannot send message to socket server 
at " + hostName + ":" + port, e);
                        }
+                       throw new RuntimeException("Cannot send message \"" + 
value.toString() +
+                                       "\" to socket server at " + hostName + 
":" + port, e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
new file mode 100644
index 0000000..68e2a75
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+               DataStream<Tuple2<String, Integer>> counts =
+                               text.flatMap(new Tokenizer())
+                                               .groupBy(0).sum(1);
+
+               counts.writeAsCsv(resultPath);
+
+               env.execute("WriteAsCsvTest");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               //Strip the parentheses from the expected text like output
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
+                               .replaceAll("[\\\\(\\\\)]", ""), resultPath);
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out)
+                               throws Exception {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
new file mode 100644
index 0000000..bf96cc1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.SocketOutputTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Ignore;
+
+@Ignore
+//This test sometimes failes most likely due to the behaviour
+//of the socket. Disabled for now.
+public class SocketOutputFormatITCase extends SocketOutputTestBase {
+
+       @Override
+       protected void testProgram() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+               DataStream<String> counts =
+                               text.flatMap(new 
CsvOutputFormatITCase.Tokenizer())
+                                               .groupBy(0).sum(1).map(new 
MapFunction<Tuple2<String, Integer>, String>() {
+                                       @Override
+                                       public String map(Tuple2<String, 
Integer> value) throws Exception {
+                                               return value.toString() + "\n";
+                                       }
+                               });
+               counts.writeToSocket(HOST, port, new DummyStringSchema());
+
+               env.execute("WriteToSocketTest");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
new file mode 100644
index 0000000..3c48b3f
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class TextOutputFormatITCase extends StreamingProgramTestBase {
+
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+               DataStream<Tuple2<String, Integer>> counts =
+                               text.flatMap(new 
CsvOutputFormatITCase.Tokenizer())
+                                               .groupBy(0).sum(1);
+
+               counts.writeAsText(resultPath);
+
+               env.execute("WriteAsTextTest");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
new file mode 100644
index 0000000..a6e1e7e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test base for streaming programs relying on an open server socket to write 
to.
+ */
+public abstract class SocketOutputTestBase extends StreamingProgramTestBase {
+
+       protected static final String HOST = "localhost";
+       protected static Integer port;
+       protected Set<String> dataReadFromSocket = new HashSet<String>();
+
+       @Override
+       protected void preSubmit() throws Exception {
+               port = NetUtils.getAvailablePort();
+               temporarySocket = createLocalSocket(port);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               Set<String> expectedData = new 
HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n")));
+               Assert.assertEquals(expectedData, dataReadFromSocket);
+               temporarySocket.close();
+       }
+
+       protected ServerSocket temporarySocket;
+
+       public ServerSocket createLocalSocket(int port) throws Exception {
+               ServerSocket serverSocket = new ServerSocket(port);
+               ServerThread st = new ServerThread(serverSocket);
+               st.start();
+               return serverSocket;
+       }
+
+       protected class ServerThread extends Thread {
+
+               private ServerSocket serverSocket;
+               private Thread t;
+
+               public ServerThread(ServerSocket serverSocket) {
+                       this.serverSocket = serverSocket;
+                       t = new Thread(this);
+               }
+
+               public void waitForAccept() throws Exception {
+                       Socket socket = serverSocket.accept();
+                       BufferedReader in = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
+                       DeserializationSchema<String> schema = new 
DummyStringSchema();
+                       String rawData = in.readLine();
+                       while (rawData != null){
+                               String string = 
schema.deserialize(rawData.getBytes());
+                               dataReadFromSocket.add(string);
+                               rawData = in.readLine();
+                       }
+                       socket.close();
+               }
+
+               public void run() {
+                       try {
+                               waitForAccept();
+                       } catch (Exception e) {
+                               Assert.fail();
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               @Override
+               public void start() {
+                       t.start();
+               }
+       }
+
+       public static class DummyStringSchema implements 
DeserializationSchema<String>, SerializationSchema<String, byte[]>{
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public boolean isEndOfStream(String nextElement) {
+               return nextElement.equals("q");
+       }
+
+               @Override
+               public byte[] serialize(String element) {
+               return element.getBytes();
+       }
+
+               @Override
+               public String deserialize(byte[] message) {
+               return new String(message);
+       }
+
+               @Override
+               public TypeInformation<String> getProducedType() {
+               return TypeExtractor.getForClass(String.class);
+       }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
index 43b061e..37f6958 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
@@ -86,4 +86,4 @@ public abstract class SocketProgramITCaseBase extends 
StreamingProgramTestBase {
                        t.start();
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
index 20f6ebe..838834b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -27,4 +27,4 @@ public class SocketTextStreamWordCountITCase extends 
SocketProgramITCaseBase {
                SocketTextStreamWordCount.main(new String[]{HOST, 
port.toString(), resultPath});
        }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
index cfde04f..b3629ad 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -27,4 +27,4 @@ public class SocketTextStreamWordCountITCase extends 
SocketProgramITCaseBase {
                SocketTextStreamWordCount.main(new String[]{HOST, 
port.toString(), resultPath});
        }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml 
b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
index 51bea21..9ea30fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
@@ -72,6 +72,7 @@ under the License.
                        <version>${guava.version}</version>
                </dependency>
 
+               <!-- To access general test utils -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-tests</artifactId>
@@ -80,6 +81,23 @@ under the License.
                        <type>test-jar</type>
                </dependency>
 
+               <!-- To access test data -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- To access streaming test utils -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
        </dependencies>
 
        <build>
@@ -124,7 +142,7 @@ under the License.
                                   </compilerPlugins>
                                </configuration>
                        </plugin>
-                       
+
                        <!-- Eclipse Integration -->
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
new file mode 100644
index 0000000..0c60719
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, 
resultPath);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               //Strip the parentheses from the expected text like output
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
+                               .replaceAll("[\\\\(\\\\)]", ""), resultPath);
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out)
+                               throws Exception {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
new file mode 100644
index 0000000..a2a78b7
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.SocketOutputTestBase;
+import org.apache.flink.streaming.util.SocketProgramITCaseBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Ignore;
+
+@Ignore
+//This test sometimes failes most likely due to the behaviour
+//of the socket. Disabled for now.
+public class SocketOutputFormatITCase extends SocketOutputTestBase {
+
+               @Override
+               protected void testProgram() throws Exception {
+                       
OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port);
+               }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
new file mode 100644
index 0000000..530ba67
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class TextOutputFormatITCase extends StreamingProgramTestBase {
+
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, 
resultPath);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
new file mode 100644
index 0000000..88b0f4f
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema
+
+import scala.language.existentials
+
+/**
+ * Test programs for built in output formats. Invoked from {@link 
OutputFormatTest}.
+ */
+object OutputFormatTestPrograms  {
+
+  def wordCountToText(input : String, outputPath : String) : Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the 
corresponding objects
+    val text = env.fromElements(input)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    counts.writeAsText(outputPath)
+
+    env.execute("Scala WordCountToText")
+  }
+
+  def wordCountToCsv(input : String, outputPath : String) : Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the 
corresponding objects
+    val text = env.fromElements(input)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    counts.writeAsCsv(outputPath)
+
+    env.execute("Scala WordCountToCsv")
+  }
+
+  def wordCountToSocket(input : String, outputHost : String, outputPort : Int) 
: Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the 
corresponding objects
+    val text = env.fromElements(input)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+      .map(tuple => tuple.toString() + "\n")
+
+    counts.writeToSocket(outputHost, outputPort, new DummyStringSchema())
+
+    env.execute("Scala WordCountToCsv")
+  }
+
+}

Reply via email to