[FLINK-2139] [streaming] Streaming outputformat tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f72e5c8c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f72e5c8c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f72e5c8c Branch: refs/heads/master Commit: f72e5c8cd781b2d32aa89fd62f3b0b8c78ded0f8 Parents: 2298cfe Author: mbalassi <mbala...@apache.org> Authored: Thu Jun 4 11:04:46 2015 +0200 Committer: mbalassi <mbala...@apache.org> Committed: Sat Jun 6 13:56:54 2015 +0200 ---------------------------------------------------------------------- .../flink/api/avro/AvroOutputFormatITCase.java | 173 +++++++++++++++++++ .../flink/api/avro/AvroOutputFormatTest.java | 173 ------------------- .../api/functions/sink/SocketClientSink.java | 2 + .../api/outputformat/CsvOutputFormatITCase.java | 78 +++++++++ .../outputformat/SocketOutputFormatITCase.java | 52 ++++++ .../outputformat/TextOutputFormatITCase.java | 55 ++++++ .../streaming/util/SocketOutputTestBase.java | 129 ++++++++++++++ .../streaming/util/SocketProgramITCaseBase.java | 2 +- .../socket/SocketTextStreamWordCountITCase.java | 2 +- .../socket/SocketTextStreamWordCountITCase.java | 2 +- .../flink-streaming-scala/pom.xml | 20 ++- .../scala/api/CsvOutputFormatITCase.java | 67 +++++++ .../scala/api/SocketOutputFormatITCase.java | 36 ++++ .../scala/api/TextOutputFormatITCase.java | 43 +++++ .../api/scala/OutputFormatTestPrograms.scala | 76 ++++++++ 15 files changed, 733 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java new file mode 100644 index 0000000..d40fec5 --- /dev/null +++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import org.junit.Assert; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.io.DatumReader; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.util.JavaProgramTestBase; + +@SuppressWarnings("serial") +public class AvroOutputFormatITCase extends JavaProgramTestBase { + + public static String outputPath1; + + public static String outputPath2; + + public static String inputPath; + + public static String userData = "alice|1|blue\n" + + "bob|2|red\n" + + "john|3|yellow\n" + + "walt|4|black\n"; + + @Override + protected void preSubmit() throws Exception { + inputPath = createTempFile("user", userData); + outputPath1 = getTempDirPath("avro_output1"); + outputPath2 = getTempDirPath("avro_output2"); + } + + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) + .fieldDelimiter("|") + .types(String.class, Integer.class, String.class); + + //output the data with AvroOutputFormat for specific user type + DataSet<User> specificUser = input.map(new ConvertToUser()); + specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1); + + //output the data with AvroOutputFormat for reflect user type + DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective()); + reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2); + + env.execute(); + } + + @Override + protected void postSubmit() throws Exception { + //compare result for specific user type + File [] output1; + File file1 = asFile(outputPath1); + if (file1.isDirectory()) { + output1 = file1.listFiles(); + // check for avro ext in dir. + for (File avroOutput : output1) { + Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); + } + } else { + output1 = new File[] {file1}; + } + List<String> result1 = new ArrayList<String>(); + DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); + for (File avroOutput : output1) { + + DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); + while (dataFileReader1.hasNext()) { + User user = dataFileReader1.next(); + result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); + } + } + for (String expectedResult : userData.split("\n")) { + Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult)); + } + + //compare result for reflect user type + File [] output2; + File file2 = asFile(outputPath2); + if (file2.isDirectory()) { + output2 = file2.listFiles(); + } else { + output2 = new File[] {file2}; + } + List<String> result2 = new ArrayList<String>(); + DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class); + for (File avroOutput : output2) { + DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2); + while (dataFileReader2.hasNext()) { + ReflectiveUser user = dataFileReader2.next(); + result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); + } + } + for (String expectedResult : userData.split("\n")) { + Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult)); + } + + + } + + + public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { + + @Override + public User map(Tuple3<String, Integer, String> value) throws Exception { + return new User(value.f0, value.f1, value.f2); + } + } + + public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { + + @Override + public ReflectiveUser map(User value) throws Exception { + return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString()); + } + } + + + public static class ReflectiveUser { + private String name; + private int favoriteNumber; + private String favoriteColor; + + public ReflectiveUser() {} + + public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) { + this.name = name; + this.favoriteNumber = favoriteNumber; + this.favoriteColor = favoriteColor; + } + + public String getName() { + return this.name; + } + public String getFavoriteColor() { + return this.favoriteColor; + } + public int getFavoriteNumber() { + return this.favoriteNumber; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java deleted file mode 100644 index a8bace3..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.junit.Assert; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -import org.apache.avro.file.DataFileReader; -import org.apache.avro.io.DatumReader; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.flink.api.io.avro.example.User; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.util.JavaProgramTestBase; - -@SuppressWarnings("serial") -public class AvroOutputFormatTest extends JavaProgramTestBase { - - public static String outputPath1; - - public static String outputPath2; - - public static String inputPath; - - public static String userData = "alice|1|blue\n" + - "bob|2|red\n" + - "john|3|yellow\n" + - "walt|4|black\n"; - - @Override - protected void preSubmit() throws Exception { - inputPath = createTempFile("user", userData); - outputPath1 = getTempDirPath("avro_output1"); - outputPath2 = getTempDirPath("avro_output2"); - } - - - @Override - protected void testProgram() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) - .fieldDelimiter("|") - .types(String.class, Integer.class, String.class); - - //output the data with AvroOutputFormat for specific user type - DataSet<User> specificUser = input.map(new ConvertToUser()); - specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1); - - //output the data with AvroOutputFormat for reflect user type - DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective()); - reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2); - - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - //compare result for specific user type - File [] output1; - File file1 = asFile(outputPath1); - if (file1.isDirectory()) { - output1 = file1.listFiles(); - // check for avro ext in dir. - for (File avroOutput : output1) { - Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); - } - } else { - output1 = new File[] {file1}; - } - List<String> result1 = new ArrayList<String>(); - DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); - for (File avroOutput : output1) { - - DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); - while (dataFileReader1.hasNext()) { - User user = dataFileReader1.next(); - result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); - } - } - for (String expectedResult : userData.split("\n")) { - Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult)); - } - - //compare result for reflect user type - File [] output2; - File file2 = asFile(outputPath2); - if (file2.isDirectory()) { - output2 = file2.listFiles(); - } else { - output2 = new File[] {file2}; - } - List<String> result2 = new ArrayList<String>(); - DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class); - for (File avroOutput : output2) { - DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2); - while (dataFileReader2.hasNext()) { - ReflectiveUser user = dataFileReader2.next(); - result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); - } - } - for (String expectedResult : userData.split("\n")) { - Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult)); - } - - - } - - - public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { - - @Override - public User map(Tuple3<String, Integer, String> value) throws Exception { - return new User(value.f0, value.f1, value.f2); - } - } - - public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { - - @Override - public ReflectiveUser map(User value) throws Exception { - return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString()); - } - } - - - public static class ReflectiveUser { - private String name; - private int favoriteNumber; - private String favoriteColor; - - public ReflectiveUser() {} - - public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) { - this.name = name; - this.favoriteNumber = favoriteNumber; - this.favoriteColor = favoriteColor; - } - - public String getName() { - return this.name; - } - public String getFavoriteColor() { - return this.favoriteColor; - } - public int getFavoriteNumber() { - return this.favoriteNumber; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java index cd6c21c..3fd2678 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java @@ -85,6 +85,8 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> { if(LOG.isErrorEnabled()){ LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e); } + throw new RuntimeException("Cannot send message \"" + value.toString() + + "\" to socket server at " + hostName + ":" + port, e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java new file mode 100644 index 0000000..68e2a75 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.outputformat; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.util.Collector; + +public class CsvOutputFormatITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> text = env.fromElements(WordCountData.TEXT); + + DataStream<Tuple2<String, Integer>> counts = + text.flatMap(new Tokenizer()) + .groupBy(0).sum(1); + + counts.writeAsCsv(resultPath); + + env.execute("WriteAsCsvTest"); + } + + @Override + protected void postSubmit() throws Exception { + //Strip the parentheses from the expected text like output + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES + .replaceAll("[\\\\(\\\\)]", ""), resultPath); + } + + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) + throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java new file mode 100644 index 0000000..bf96cc1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.outputformat; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.SocketOutputTestBase; +import org.apache.flink.test.testdata.WordCountData; +import org.junit.Ignore; + +@Ignore +//This test sometimes failes most likely due to the behaviour +//of the socket. Disabled for now. +public class SocketOutputFormatITCase extends SocketOutputTestBase { + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> text = env.fromElements(WordCountData.TEXT); + + DataStream<String> counts = + text.flatMap(new CsvOutputFormatITCase.Tokenizer()) + .groupBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() { + @Override + public String map(Tuple2<String, Integer> value) throws Exception { + return value.toString() + "\n"; + } + }); + counts.writeToSocket(HOST, port, new DummyStringSchema()); + + env.execute("WriteToSocketTest"); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java new file mode 100644 index 0000000..3c48b3f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.outputformat; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; + +public class TextOutputFormatITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> text = env.fromElements(WordCountData.TEXT); + + DataStream<Tuple2<String, Integer>> counts = + text.flatMap(new CsvOutputFormatITCase.Tokenizer()) + .groupBy(0).sum(1); + + counts.writeAsText(resultPath); + + env.execute("WriteAsTextTest"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java new file mode 100644 index 0000000..a6e1e7e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.test.testdata.WordCountData; +import org.junit.Assert; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * Test base for streaming programs relying on an open server socket to write to. + */ +public abstract class SocketOutputTestBase extends StreamingProgramTestBase { + + protected static final String HOST = "localhost"; + protected static Integer port; + protected Set<String> dataReadFromSocket = new HashSet<String>(); + + @Override + protected void preSubmit() throws Exception { + port = NetUtils.getAvailablePort(); + temporarySocket = createLocalSocket(port); + } + + @Override + protected void postSubmit() throws Exception { + Set<String> expectedData = new HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n"))); + Assert.assertEquals(expectedData, dataReadFromSocket); + temporarySocket.close(); + } + + protected ServerSocket temporarySocket; + + public ServerSocket createLocalSocket(int port) throws Exception { + ServerSocket serverSocket = new ServerSocket(port); + ServerThread st = new ServerThread(serverSocket); + st.start(); + return serverSocket; + } + + protected class ServerThread extends Thread { + + private ServerSocket serverSocket; + private Thread t; + + public ServerThread(ServerSocket serverSocket) { + this.serverSocket = serverSocket; + t = new Thread(this); + } + + public void waitForAccept() throws Exception { + Socket socket = serverSocket.accept(); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + DeserializationSchema<String> schema = new DummyStringSchema(); + String rawData = in.readLine(); + while (rawData != null){ + String string = schema.deserialize(rawData.getBytes()); + dataReadFromSocket.add(string); + rawData = in.readLine(); + } + socket.close(); + } + + public void run() { + try { + waitForAccept(); + } catch (Exception e) { + Assert.fail(); + throw new RuntimeException(e); + } + } + + @Override + public void start() { + t.start(); + } + } + + public static class DummyStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]>{ + private static final long serialVersionUID = 1L; + + @Override + public boolean isEndOfStream(String nextElement) { + return nextElement.equals("q"); + } + + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + + @Override + public String deserialize(byte[] message) { + return new String(message); + } + + @Override + public TypeInformation<String> getProducedType() { + return TypeExtractor.getForClass(String.class); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java index 43b061e..37f6958 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java @@ -86,4 +86,4 @@ public abstract class SocketProgramITCaseBase extends StreamingProgramTestBase { t.start(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java index 20f6ebe..838834b 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java @@ -27,4 +27,4 @@ public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase { SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath}); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java index cfde04f..b3629ad 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java @@ -27,4 +27,4 @@ public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase { SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath}); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml index 51bea21..9ea30fc 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml +++ b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml @@ -72,6 +72,7 @@ under the License. <version>${guava.version}</version> </dependency> + <!-- To access general test utils --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-tests</artifactId> @@ -80,6 +81,23 @@ under the License. <type>test-jar</type> </dependency> + <!-- To access test data --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <!-- To access streaming test utils --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> <build> @@ -124,7 +142,7 @@ under the License. </compilerPlugins> </configuration> </plugin> - + <!-- Eclipse Integration --> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java new file mode 100644 index 0000000..0c60719 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.api; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.util.Collector; + +public class CsvOutputFormatITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + //Strip the parentheses from the expected text like output + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES + .replaceAll("[\\\\(\\\\)]", ""), resultPath); + } + + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) + throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java new file mode 100644 index 0000000..a2a78b7 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.api; + +import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; +import org.apache.flink.streaming.util.SocketOutputTestBase; +import org.apache.flink.streaming.util.SocketProgramITCaseBase; +import org.apache.flink.test.testdata.WordCountData; +import org.junit.Ignore; + +@Ignore +//This test sometimes failes most likely due to the behaviour +//of the socket. Disabled for now. +public class SocketOutputFormatITCase extends SocketOutputTestBase { + + @Override + protected void testProgram() throws Exception { + OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java new file mode 100644 index 0000000..530ba67 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.api; + +import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; + +public class TextOutputFormatITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala new file mode 100644 index 0000000..88b0f4f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala + +import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema +import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema + +import scala.language.existentials + +/** + * Test programs for built in output formats. Invoked from {@link OutputFormatTest}. + */ +object OutputFormatTestPrograms { + + def wordCountToText(input : String, outputPath : String) : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.fromElements(input) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + counts.writeAsText(outputPath) + + env.execute("Scala WordCountToText") + } + + def wordCountToCsv(input : String, outputPath : String) : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.fromElements(input) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + counts.writeAsCsv(outputPath) + + env.execute("Scala WordCountToCsv") + } + + def wordCountToSocket(input : String, outputHost : String, outputPort : Int) : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.fromElements(input) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + .map(tuple => tuple.toString() + "\n") + + counts.writeToSocket(outputHost, outputPort, new DummyStringSchema()) + + env.execute("Scala WordCountToCsv") + } + +}