[scala] [streaming] Finished scala StreamExecutionEnvrionment functionality + DataStream sinks + docs
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/de06d958 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/de06d958 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/de06d958 Branch: refs/heads/master Commit: de06d9589cfab83183458d18eeeadd4d604c8a65 Parents: c123e11 Author: Gyula Fora <[email protected]> Authored: Sat Dec 13 01:08:08 2014 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Jan 2 18:34:38 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 337 ++++--------------- .../environment/StreamExecutionEnvironment.java | 40 ++- .../api/function/sink/WriteFormatAsCsv.java | 6 +- .../api/function/sink/WriteSinkFunction.java | 15 +- .../sink/WriteSinkFunctionByBatches.java | 49 --- .../sink/WriteSinkFunctionByMillis.java | 7 +- .../flink/streaming/api/WriteAsCsvTest.java | 176 ---------- .../flink/streaming/api/WriteAsTextTest.java | 177 ---------- .../flink/api/scala/streaming/DataStream.scala | 181 +++++++++- .../streaming/StreamExecutionEnvironment.scala | 103 +++++- 10 files changed, 389 insertions(+), 702 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 1cf8d72..3a3cdc4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -44,9 +44,9 @@ import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.function.aggregation.SumAggregator; import org.apache.flink.streaming.api.function.sink.PrintSinkFunction; import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.api.function.sink.WriteFormat; import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv; import org.apache.flink.streaming.api.function.sink.WriteFormatAsText; -import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches; import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis; import org.apache.flink.streaming.api.invokable.SinkInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; @@ -275,7 +275,9 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output is - * partitioned by the selected fields. + * partitioned by the selected fields. This setting only effects the how the + * outputs will be distributed between the parallel instances of the next + * processing operator. * * @param fields * The fields to partition by. @@ -289,7 +291,9 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output is - * partitioned by the given field expressions. + * partitioned by the given field expressions. This setting only effects the + * how the outputs will be distributed between the parallel instances of the + * next processing operator. * * @param fields * The fields expressions to partition by. @@ -303,7 +307,9 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output is - * partitioned using the given {@link KeySelector}. + * partitioned using the given {@link KeySelector}. This setting only + * effects the how the outputs will be distributed between the parallel + * instances of the next processing operator. * * @param keySelector * @return @@ -314,7 +320,9 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are broadcasted to every parallel instance of the next component. + * are broadcasted to every parallel instance of the next component. This + * setting only effects the how the outputs will be distributed between the + * parallel instances of the next processing operator. * * @return The DataStream with broadcast partitioning set. */ @@ -324,7 +332,9 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are shuffled to the next component. + * are shuffled to the next component. This setting only effects the how the + * outputs will be distributed between the parallel instances of the next + * processing operator. * * @return The DataStream with shuffle partitioning set. */ @@ -334,8 +344,10 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are forwarded to the local subtask of the next component. This is the - * default partitioner setting. + * are forwarded to the local subtask of the next component (whenever + * possible). This is the default partitioner setting. This setting only + * effects the how the outputs will be distributed between the parallel + * instances of the next processing operator. * * @return The DataStream with shuffle partitioning set. */ @@ -345,7 +357,9 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are distributed evenly to the next component. + * are distributed evenly to the next component.This setting only effects + * the how the outputs will be distributed between the parallel instances of + * the next processing operator. * * @return The DataStream with shuffle partitioning set. */ @@ -547,9 +561,9 @@ public class DataStream<OUT> { } /** - * Applies an aggregation that that gives the sum of the pojo data stream at - * the given field expression. A field expression is either the name of a - * public field or a getter method with parentheses of the + * Applies an aggregation that that gives the current sum of the pojo data + * stream at the given field expression. A field expression is either the + * name of a public field or a getter method with parentheses of the * {@link DataStream}S underlying type. A dot can be used to drill down into * objects, as in {@code "field1.getInnerField2()" }. * @@ -563,8 +577,8 @@ public class DataStream<OUT> { } /** - * Applies an aggregation that that gives the minimum of the data stream at - * the given position. + * Applies an aggregation that that gives the current minimum of the data + * stream at the given position. * * @param positionToMin * The position in the data point to minimize @@ -577,9 +591,9 @@ public class DataStream<OUT> { } /** - * Applies an aggregation that that gives the minimum of the pojo data - * stream at the given field expression. A field expression is either the - * name of a public field or a getter method with parentheses of the + * Applies an aggregation that that gives the current minimum of the pojo + * data stream at the given field expression. A field expression is either + * the name of a public field or a getter method with parentheses of the * {@link DataStream}S underlying type. A dot can be used to drill down into * objects, as in {@code "field1.getInnerField2()" }. * @@ -594,8 +608,8 @@ public class DataStream<OUT> { } /** - * Applies an aggregation that gives the maximum of the data stream at the - * given position. + * Applies an aggregation that gives the current maximum of the data stream + * at the given position. * * @param positionToMax * The position in the data point to maximize @@ -608,9 +622,9 @@ public class DataStream<OUT> { } /** - * Applies an aggregation that that gives the maximum of the pojo data - * stream at the given field expression. A field expression is either the - * name of a public field or a getter method with parentheses of the + * Applies an aggregation that that gives the current maximum of the pojo + * data stream at the given field expression. A field expression is either + * the name of a public field or a getter method with parentheses of the * {@link DataStream}S underlying type. A dot can be used to drill down into * objects, as in {@code "field1.getInnerField2()" }. * @@ -625,11 +639,11 @@ public class DataStream<OUT> { } /** - * Applies an aggregation that that gives the minimum element of the pojo - * data stream by the given field expression. A field expression is either - * the name of a public field or a getter method with parentheses of the - * {@link DataStream}S underlying type. A dot can be used to drill down into - * objects, as in {@code "field1.getInnerField2()" }. + * Applies an aggregation that that gives the current minimum element of the + * pojo data stream by the given field expression. A field expression is + * either the name of a public field or a getter method with parentheses of + * the {@link DataStream}S underlying type. A dot can be used to drill down + * into objects, as in {@code "field1.getInnerField2()" }. * * @param field * The field expression based on which the aggregation will be @@ -645,11 +659,11 @@ public class DataStream<OUT> { } /** - * Applies an aggregation that that gives the maximum element of the pojo - * data stream by the given field expression. A field expression is either - * the name of a public field or a getter method with parentheses of the - * {@link DataStream}S underlying type. A dot can be used to drill down into - * objects, as in {@code "field1.getInnerField2()" }. + * Applies an aggregation that that gives the current maximum element of the + * pojo data stream by the given field expression. A field expression is + * either the name of a public field or a getter method with parentheses of + * the {@link DataStream}S underlying type. A dot can be used to drill down + * into objects, as in {@code "field1.getInnerField2()" }. * * @param field * The field expression based on which the aggregation will be @@ -677,7 +691,7 @@ public class DataStream<OUT> { public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } - + /** * Applies an aggregation that that gives the current element with the * minimum value at the given position, if more elements have the minimum @@ -724,7 +738,7 @@ public class DataStream<OUT> { public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } - + /** * Applies an aggregation that that gives the current element with the * maximum value at the given position, if more elements have the maximum @@ -759,7 +773,8 @@ public class DataStream<OUT> { } /** - * Applies an aggregation that gives the count of the values. + * Creates a new DataStream containing the current number (count) of + * received records. * * @return The transformed DataStream. */ @@ -826,9 +841,8 @@ public class DataStream<OUT> { * @return The closed DataStream. */ public DataStreamSink<OUT> print() { - DataStream<OUT> inputStream = this.copy(); PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>(); - DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType()); + DataStreamSink<OUT> returnStream = addSink(printFunction); return returnStream; } @@ -841,28 +855,13 @@ public class DataStream<OUT> { * @return The closed DataStream. */ public DataStreamSink<OUT> printToErr() { - DataStream<OUT> inputStream = this.copy(); PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>(true); - DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType()); + DataStreamSink<OUT> returnStream = addSink(printFunction); return returnStream; } /** - * Writes a DataStream to the file specified by path in text format. For - * every element of the DataStream the result of {@link Object#toString()} - * is written. - * - * @param path - * is the path to the location where the tuples are written - * - * @return The closed DataStream - */ - public DataStreamSink<OUT> writeAsText(String path) { - return writeAsText(this, path, new WriteFormatAsText<OUT>(), 1, null); - } - - /** * Writes a DataStream to the file specified by path in text format. The * writing is performed periodically, in every millis milliseconds. For * every element of the DataStream the result of {@link Object#toString()} @@ -876,122 +875,7 @@ public class DataStream<OUT> { * @return The closed DataStream */ public DataStreamSink<OUT> writeAsText(String path, long millis) { - return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, null); - } - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically in equally sized batches. For every - * element of the DataStream the result of {@link Object#toString()} is - * written. - * - * @param path - * is the path to the location where the tuples are written - * @param batchSize - * is the size of the batches, i.e. the number of tuples written - * to the file at a time - * - * @return The closed DataStream - */ - public DataStreamSink<OUT> writeAsText(String path, int batchSize) { - return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, null); - } - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. For - * every element of the DataStream the result of {@link Object#toString()} - * is written. - * - * @param path - * is the path to the location where the tuples are written - * @param millis - * is the file update frequency - * @param endTuple - * is a special tuple indicating the end of the stream. If an - * endTuple is caught, the last pending batch of tuples will be - * immediately appended to the target file regardless of the - * system time. - * - * @return The closed DataStream - */ - public DataStreamSink<OUT> writeAsText(String path, long millis, OUT endTuple) { - return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, endTuple); - } - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically in equally sized batches. For every - * element of the DataStream the result of {@link Object#toString()} is - * written. - * - * @param path - * is the path to the location where the tuples are written - * @param batchSize - * is the size of the batches, i.e. the number of tuples written - * to the file at a time - * @param endTuple - * is a special tuple indicating the end of the stream. If an - * endTuple is caught, the last pending batch of tuples will be - * immediately appended to the target file regardless of the - * batchSize. - * - * @return The closed DataStream - */ - public DataStreamSink<OUT> writeAsText(String path, int batchSize, OUT endTuple) { - return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, endTuple); - } - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. For - * every element of the DataStream the result of {@link Object#toString()} - * is written. - * - * @param path - * is the path to the location where the tuples are written - * @param millis - * is the file update frequency - * @param endTuple - * is a special tuple indicating the end of the stream. If an - * endTuple is caught, the last pending batch of tuples will be - * immediately appended to the target file regardless of the - * system time. - * - * @return the data stream constructed - */ - private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path, - WriteFormatAsText<OUT> format, long millis, OUT endTuple) { - DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>( - path, format, millis, endTuple), inputStream.typeInfo); - return returnStream; - } - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically in equally sized batches. For every - * element of the DataStream the result of {@link Object#toString()} is - * written. - * - * @param path - * is the path to the location where the tuples are written - * @param batchSize - * is the size of the batches, i.e. the number of tuples written - * to the file at a time - * @param endTuple - * is a special tuple indicating the end of the stream. If an - * endTuple is caught, the last pending batch of tuples will be - * immediately appended to the target file regardless of the - * batchSize. - * - * @return the data stream constructed - */ - private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path, - WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) { - DataStreamSink<OUT> returnStream = addSink(inputStream, - new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), - inputStream.typeInfo); - return returnStream; + return writeAsText(path, new WriteFormatAsText<OUT>(), millis); } /** @@ -1004,8 +888,8 @@ public class DataStream<OUT> { * * @return The closed DataStream */ - public DataStreamSink<OUT> writeAsCsv(String path) { - return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 1, null); + public DataStreamSink<OUT> writeAsText(String path) { + return writeAsText(path, 0); } /** @@ -1022,74 +906,28 @@ public class DataStream<OUT> { * @return The closed DataStream */ public DataStreamSink<OUT> writeAsCsv(String path, long millis) { - return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, null); - } - - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically in equally sized batches. For every - * element of the DataStream the result of {@link Object#toString()} is - * written. - * - * @param path - * is the path to the location where the tuples are written - * @param batchSize - * is the size of the batches, i.e. the number of tuples written - * to the file at a time - * - * @return The closed DataStream - */ - public DataStreamSink<OUT> writeAsCsv(String path, int batchSize) { - return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, null); + if (!getType().isTupleType()) { + throw new RuntimeException("Only tuple data streams can be written in csv format"); + } + return writeAsText(path, new WriteFormatAsCsv<OUT>(), millis); } /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. For + * Writes a DataStream to the file specified by path in text format. For * every element of the DataStream the result of {@link Object#toString()} * is written. * * @param path * is the path to the location where the tuples are written - * @param millis - * is the file update frequency - * @param endTuple - * is a special tuple indicating the end of the stream. If an - * endTuple is caught, the last pending batch of tuples will be - * immediately appended to the target file regardless of the - * system time. * * @return The closed DataStream */ - public DataStreamSink<OUT> writeAsCsv(String path, long millis, OUT endTuple) { - return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, endTuple); + public DataStreamSink<OUT> writeAsCsv(String path) { + return writeAsCsv(path, 0); } /** * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically in equally sized batches. For every - * element of the DataStream the result of {@link Object#toString()} is - * written. - * - * @param path - * is the path to the location where the tuples are written - * @param batchSize - * is the size of the batches, i.e. the number of tuples written - * to the file at a time - * @param endTuple - * is a special tuple indicating the end of the stream. If an - * endTuple is caught, the last pending batch of tuples will be - * immediately appended to the target file regardless of the - * batchSize. - * - * @return The closed DataStream - */ - public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) { - return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple); - } - - /** - * Writes a DataStream to the file specified by path in csv format. The * writing is performed periodically, in every millis milliseconds. For * every element of the DataStream the result of {@link Object#toString()} * is written. @@ -1098,45 +936,12 @@ public class DataStream<OUT> { * is the path to the location where the tuples are written * @param millis * is the file update frequency - * @param endTuple - * is a special tuple indicating the end of the stream. If an - * endTuple is caught, the last pending batch of tuples will be - * immediately appended to the target file regardless of the - * system time. - * - * @return the data stream constructed - */ - private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path, - WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) { - DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>( - path, format, millis, endTuple), inputStream.typeInfo); - return returnStream; - } - - /** - * Writes a DataStream to the file specified by path in csv format. The - * writing is performed periodically in equally sized batches. For every - * element of the DataStream the result of {@link Object#toString()} is - * written. - * - * @param path - * is the path to the location where the tuples are written - * @param batchSize - * is the size of the batches, i.e. the number of tuples written - * to the file at a time - * @param endTuple - * is a special tuple indicating the end of the stream. If an - * endTuple is caught, the last pending batch of tuples will be - * immediately appended to the target file regardless of the - * batchSize. * * @return the data stream constructed */ - private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path, - WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) { - DataStreamSink<OUT> returnStream = addSink(inputStream, - new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), - inputStream.typeInfo); + private DataStreamSink<OUT> writeAsText(String path, WriteFormat<OUT> format, long millis) { + DataStreamSink<OUT> returnStream = addSink(new WriteSinkFunctionByMillis<OUT>(path, format, + millis)); return returnStream; } @@ -1241,21 +1046,13 @@ public class DataStream<OUT> { * @return The closed DataStream. */ public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) { - return addSink(this.copy(), sinkFunction); - } - - private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) { - return addSink(inputStream, sinkFunction, getType()); - } - private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, - SinkFunction<OUT> sinkFunction, TypeInformation<OUT> inTypeInfo) { DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo); jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>( - clean(sinkFunction)), inTypeInfo, null, "sink", degreeOfParallelism); + clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism); - inputStream.connectGraph(inputStream.copy(), returnStream.getId(), 0); + this.connectGraph(this.copy(), returnStream.getId(), 0); return returnStream; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f50ab91..6108975 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -212,7 +212,8 @@ public abstract class StreamExecutionEnvironment { /** * Creates a DataStream that represents the Strings produced by reading the * given file line wise multiple times(infinite). The file will be read with - * the system's default character set. + * the system's default character set. This functionality can be used for + * testing a topology. * * @param filePath * The path of the file, as a URI (e.g., @@ -350,8 +351,17 @@ public abstract class StreamExecutionEnvironment { return addSource(new GenSequenceFunction(from, to)); } + private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat, + TypeInformation<String> typeInfo) { + FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); + DataStreamSource<String> returnStream = addSource(function); + jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat); + return returnStream; + } + /** - * Ads a data source thus opening a {@link DataStream}. + * Create a DataStream using a user defined source function for arbitrary + * source functionality. * * @param function * the user defined function @@ -371,11 +381,27 @@ public abstract class StreamExecutionEnvironment { return returnStream; } - private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat, - TypeInformation<String> typeInfo) { - FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); - DataStreamSource<String> returnStream = addSource(function); - jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat); + /** + * Ads a data source with a custom type information thus opening a + * {@link DataStream}. Only in very special cases does the user need to + * support type information. Otherwise use + * {@link #addSource(SourceFunction)} + * + * @param function + * the user defined function + * @param outTypeInfo + * the user defined type information for the stream + * @param <OUT> + * type of the returned stream + * @return the data stream constructed + */ + public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, + TypeInformation<OUT> outTypeInfo) { + + DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo); + + jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function), + null, outTypeInfo, "source", 1); return returnStream; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java index ebc38bb..b22fd80 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java @@ -25,7 +25,7 @@ import java.util.ArrayList; /** * Writes tuples in csv format. - * + * * @param <IN> * Input tuple type */ @@ -37,8 +37,8 @@ public class WriteFormatAsCsv<IN> extends WriteFormat<IN> { try { PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true))); for (IN tupleToWrite : tupleList) { - outStream.println(tupleToWrite.toString().substring(1, - tupleToWrite.toString().length() - 1)); + String strTuple = tupleToWrite.toString(); + outStream.println(strTuple.substring(1, strTuple.length() - 1)); } outStream.close(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java index 01298b0..0c52afc 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java @@ -35,13 +35,11 @@ public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> { protected final String path; protected ArrayList<IN> tupleList = new ArrayList<IN>(); - protected final IN endTuple; protected WriteFormat<IN> format; - public WriteSinkFunction(String path, WriteFormat<IN> format, IN endTuple) { + public WriteSinkFunction(String path, WriteFormat<IN> format) { this.path = path; this.format = format; - this.endTuple = endTuple; cleanFile(path); } @@ -82,16 +80,13 @@ public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> { */ @Override public void invoke(IN tuple) { - if (!tuple.equals(endTuple)) { - tupleList.add(tuple); - if (updateCondition()) { - format.write(path, tupleList); - resetParameters(); - } - } else { + + tupleList.add(tuple); + if (updateCondition()) { format.write(path, tupleList); resetParameters(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java deleted file mode 100644 index 5012e25..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.function.sink; - - -/** - * Implementation of WriteSinkFunction. Writes tuples to file in equally sized - * batches. - * - * @param <IN> - * Input tuple type - */ -public class WriteSinkFunctionByBatches<IN> extends WriteSinkFunction<IN> { - private static final long serialVersionUID = 1L; - - private final int batchSize; - - public WriteSinkFunctionByBatches(String path, WriteFormat<IN> format, int batchSize, - IN endTuple) { - super(path, format, endTuple); - this.batchSize = batchSize; - } - - @Override - protected boolean updateCondition() { - return tupleList.size() >= batchSize; - } - - @Override - protected void resetParameters() { - tupleList.clear(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java index 1fa978e..ee6df94 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java @@ -17,11 +17,10 @@ package org.apache.flink.streaming.api.function.sink; - /** * Implementation of WriteSinkFunction. Writes tuples to file in every millis * milliseconds. - * + * * @param <IN> * Input tuple type */ @@ -31,8 +30,8 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> { private final long millis; private long lastTime; - public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis, IN endTuple) { - super(path, format, endTuple); + public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) { + super(path, format); this.millis = millis; lastTime = System.currentTimeMillis(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java deleted file mode 100644 index edd3ed5..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import static org.junit.Assert.assertTrue; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.util.Collector; -import org.junit.BeforeClass; -import org.junit.Test; - -public class WriteAsCsvTest { - - private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsCsvTest.class.getSimpleName() + "_"; - - private static final long MEMORYSIZE = 32; - - private static List<String> result1 = new ArrayList<String>(); - private static List<String> result2 = new ArrayList<String>(); - private static List<String> result3 = new ArrayList<String>(); - private static List<String> result4 = new ArrayList<String>(); - private static List<String> result5 = new ArrayList<String>(); - - private static List<String> expected1 = new ArrayList<String>(); - private static List<String> expected2 = new ArrayList<String>(); - private static List<String> expected3 = new ArrayList<String>(); - private static List<String> expected4 = new ArrayList<String>(); - private static List<String> expected5 = new ArrayList<String>(); - - public static final class MySource1 implements SourceFunction<Tuple1<Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public void invoke(Collector<Tuple1<Integer>> collector) throws Exception { - for (int i = 0; i < 27; i++) { - collector.collect(new Tuple1<Integer>(i)); - } - } - } - - private static void readFile(String path, List<String> result) { - try { - BufferedReader br = new BufferedReader(new FileReader(path)); - String line; - line = br.readLine(); - while (line != null) { - result.add(line); - line = br.readLine(); - } - br.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - private static void fillExpected1() { - for (int i = 0; i < 27; i++) { - expected1.add(i + ""); - } - } - - private static void fillExpected2() { - for (int i = 0; i < 25; i++) { - expected2.add(i + ""); - } - } - - private static void fillExpected3() { - for (int i = 0; i < 20; i++) { - expected3.add(i + ""); - } - } - - private static void fillExpected4() { - for (int i = 0; i < 26; i++) { - expected4.add(i + ""); - } - } - - private static void fillExpected5() { - for (int i = 0; i < 14; i++) { - expected5.add(i + ""); - } - - for (int i = 15; i < 25; i++) { - expected5.add(i + ""); - } - } - - @BeforeClass - public static void createFileCleanup() { - Runnable r = new Runnable() { - - @Override - public void run() { - try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {} - try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {} - try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {} - try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {} - try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {} - } - }; - - Runtime.getRuntime().addShutdownHook(new Thread(r)); - } - - @Test - public void test() throws Exception { - StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test1.txt"); - - fillExpected1(); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test2.txt", 5); - - fillExpected2(); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test3.txt", 10); - - fillExpected3(); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26)); - - fillExpected4(); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14)); - - fillExpected5(); - - env.execute(); - - readFile(PREFIX + "test1.txt", result1); - readFile(PREFIX + "test2.txt", result2); - readFile(PREFIX + "test3.txt", result3); - readFile(PREFIX + "test4.txt", result4); - readFile(PREFIX + "test5.txt", result5); - - assertTrue(expected1.equals(result1)); - assertTrue(expected2.equals(result2)); - assertTrue(expected3.equals(result3)); - assertTrue(expected4.equals(result4)); - assertTrue(expected5.equals(result5)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java deleted file mode 100644 index 1cad8a6..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import static org.junit.Assert.assertEquals; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.util.Collector; -import org.junit.BeforeClass; -import org.junit.Test; - -public class WriteAsTextTest { - - private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsTextTest.class.getSimpleName() + "_"; - - private static final long MEMORYSIZE = 32; - - private static List<String> result1 = new ArrayList<String>(); - private static List<String> result2 = new ArrayList<String>(); - private static List<String> result3 = new ArrayList<String>(); - private static List<String> result4 = new ArrayList<String>(); - private static List<String> result5 = new ArrayList<String>(); - - private static List<String> expected1 = new ArrayList<String>(); - private static List<String> expected2 = new ArrayList<String>(); - private static List<String> expected3 = new ArrayList<String>(); - private static List<String> expected4 = new ArrayList<String>(); - private static List<String> expected5 = new ArrayList<String>(); - - public static final class MySource1 implements SourceFunction<Tuple1<Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public void invoke(Collector<Tuple1<Integer>> collector) throws Exception { - for (int i = 0; i < 27; i++) { - collector.collect(new Tuple1<Integer>(i)); - } - } - } - - private static void readFile(String path, List<String> result) { - try { - BufferedReader br = new BufferedReader(new FileReader(path)); - String line; - line = br.readLine(); - while (line != null) { - result.add(line); - line = br.readLine(); - } - br.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - private static void fillExpected1() { - for (int i = 0; i < 27; i++) { - expected1.add("(" + i + ")"); - } - } - - private static void fillExpected2() { - for (int i = 0; i < 25; i++) { - expected2.add("(" + i + ")"); - } - } - - private static void fillExpected3() { - for (int i = 0; i < 20; i++) { - expected3.add("(" + i + ")"); - } - } - - private static void fillExpected4() { - for (int i = 0; i < 26; i++) { - expected4.add("(" + i + ")"); - } - } - - private static void fillExpected5() { - for (int i = 0; i < 14; i++) { - expected5.add("(" + i + ")"); - } - - for (int i = 15; i < 25; i++) { - expected5.add("(" + i + ")"); - } - } - - @BeforeClass - public static void createFileCleanup() { - Runnable r = new Runnable() { - - @Override - public void run() { - try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {} - try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {} - try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {} - try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {} - try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {} - } - }; - - Runtime.getRuntime().addShutdownHook(new Thread(r)); - } - - @Test - public void test() throws Exception { - StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsText(PREFIX + "test1.txt"); - - fillExpected1(); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsText(PREFIX + "test2.txt", 5); - - fillExpected2(); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsText(PREFIX + "test3.txt", 10); - - fillExpected3(); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26)); - - fillExpected4(); - - @SuppressWarnings("unused") - DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14)); - - fillExpected5(); - - env.execute(); - - readFile(PREFIX + "test1.txt", result1); - readFile(PREFIX + "test2.txt", result2); - readFile(PREFIX + "test3.txt", result3); - readFile(PREFIX + "test4.txt", result4); - readFile(PREFIX + "test5.txt", result5); - - assertEquals(expected1,result1); - assertEquals(expected2,result2); - assertEquals(expected3,result3); - assertEquals(expected4,result4); - assertEquals(expected5,result5); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index b10bdc6..e96f5eb 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.GroupedDataStream import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.streaming.api.function.sink.SinkFunction class DataStream[T](javaStream: JavaStream[T]) { @@ -86,15 +87,36 @@ class DataStream[T](javaStream: JavaStream[T]) { "parallelism.") } + /** + * Creates a new DataStream by merging DataStream outputs of + * the same type with each other. The DataStreams merged using this operator + * will be transformed simultaneously. + * + */ def merge(dataStreams: DataStream[T]*): DataStream[T] = new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*)) + /** + * Groups the elements of a DataStream by the given key positions (for tuple/array types) to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ def groupBy(fields: Int*): DataStream[T] = new DataStream[T](javaStream.groupBy(fields: _*)) + /** + * Groups the elements of a DataStream by the given field expressions to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ def groupBy(firstField: String, otherFields: String*): DataStream[T] = new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*)) + /** + * Groups the elements of a DataStream by the given K key to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = { val keyExtractor = new KeySelector[T, K] { @@ -104,12 +126,27 @@ class DataStream[T](javaStream: JavaStream[T]) { new DataStream[T](javaStream.groupBy(keyExtractor)) } + /** + * Sets the partitioning of the DataStream so that the output is + * partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator. + * + */ def partitionBy(fields: Int*): DataStream[T] = new DataStream[T](javaStream.partitionBy(fields: _*)) + /** + * Sets the partitioning of the DataStream so that the output is + * partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator. + * + */ def partitionBy(firstField: String, otherFields: String*): DataStream[T] = new DataStream[T](javaStream.partitionBy(firstField +: otherFields.toArray: _*)) + /** + * Sets the partitioning of the DataStream so that the output is + * partitioned by the given Key. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator. + * + */ def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = { val keyExtractor = new KeySelector[T, K] { @@ -119,56 +156,124 @@ class DataStream[T](javaStream: JavaStream[T]) { new DataStream[T](javaStream.partitionBy(keyExtractor)) } + /** + * Sets the partitioning of the DataStream so that the output tuples + * are broadcasted to every parallel instance of the next component. This + * setting only effects the how the outputs will be distributed between the + * parallel instances of the next processing operator. + * + */ def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast()) + /** + * Sets the partitioning of the DataStream so that the output tuples + * are shuffled to the next component. This setting only effects the how the + * outputs will be distributed between the parallel instances of the next + * processing operator. + * + */ def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle()) + /** + * Sets the partitioning of the DataStream so that the output tuples + * are forwarded to the local subtask of the next component (whenever + * possible). This is the default partitioner setting. This setting only + * effects the how the outputs will be distributed between the parallel + * instances of the next processing operator. + * + */ def forward: DataStream[T] = new DataStream[T](javaStream.forward()) + /** + * Sets the partitioning of the DataStream so that the output tuples + * are distributed evenly to the next component.This setting only effects + * the how the outputs will be distributed between the parallel instances of + * the next processing operator. + * + */ def distribute: DataStream[T] = new DataStream[T](javaStream.distribute()) + /** + * Applies an aggregation that that gives the current maximum of the data stream at + * the given position. + * + */ def max(field: Any): DataStream[T] = field match { case field: Int => return new DataStream[T](javaStream.max(field)) case field: String => return new DataStream[T](javaStream.max(field)) case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") } + /** + * Applies an aggregation that that gives the current minimum of the data stream at + * the given position. + * + */ def min(field: Any): DataStream[T] = field match { case field: Int => return new DataStream[T](javaStream.min(field)) case field: String => return new DataStream[T](javaStream.min(field)) case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") } + /** + * Applies an aggregation that sums the data stream at the given position. + * + */ def sum(field: Any): DataStream[T] = field match { case field: Int => return new DataStream[T](javaStream.sum(field)) case field: String => return new DataStream[T](javaStream.sum(field)) case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") } + /** + * Applies an aggregation that that gives the current maximum element of the data stream by + * the given position. When equality, returns the first. + * + */ def maxBy(field: Any): DataStream[T] = field match { case field: Int => return new DataStream[T](javaStream.maxBy(field)) case field: String => return new DataStream[T](javaStream.maxBy(field)) case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") } + /** + * Applies an aggregation that that gives the current minimum element of the data stream by + * the given position. When equality, returns the first. + * + */ def minBy(field: Any): DataStream[T] = field match { case field: Int => return new DataStream[T](javaStream.minBy(field)) case field: String => return new DataStream[T](javaStream.minBy(field)) case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") } + /** + * Applies an aggregation that that gives the current minimum element of the data stream by + * the given position. When equality, the user can set to get the first or last element with the minimal value. + * + */ def minBy(field: Any, first: Boolean): DataStream[T] = field match { case field: Int => return new DataStream[T](javaStream.minBy(field, first)) case field: String => return new DataStream[T](javaStream.minBy(field, first)) case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") } + /** + * Applies an aggregation that that gives the current maximum element of the data stream by + * the given position. When equality, the user can set to get the first or last element with the maximal value. + * + */ def maxBy(field: Any, first: Boolean): DataStream[T] = field match { case field: Int => return new DataStream[T](javaStream.maxBy(field, first)) case field: String => return new DataStream[T](javaStream.maxBy(field, first)) case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") } + /** + * Creates a new DataStream containing the current number (count) of + * received records. + * + */ def count: DataStream[java.lang.Long] = new DataStream[java.lang.Long](javaStream.count()) /** @@ -239,7 +344,7 @@ class DataStream[T](javaStream: JavaStream[T]) { } /** - * Creates a new [[DataStream]] by merging the elements of this DataStream using an associative reduce + * Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce * function. */ def reduce(reducer: ReduceFunction[T]): DataStream[T] = { @@ -253,7 +358,7 @@ class DataStream[T](javaStream: JavaStream[T]) { } /** - * Creates a new [[DataStream]] by merging the elements of this DataStream using an associative reduce + * Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce * function. */ def reduce(fun: (T, T) => T): DataStream[T] = { @@ -268,7 +373,7 @@ class DataStream[T](javaStream: JavaStream[T]) { } /** - * Creates a new DataSet that contains only the elements satisfying the given filter predicate. + * Creates a new DataStream that contains only the elements satisfying the given filter predicate. */ def filter(filter: FilterFunction[T]): DataStream[T] = { if (filter == null) { @@ -277,6 +382,9 @@ class DataStream[T](javaStream: JavaStream[T]) { new DataStream[T](javaStream.filter(filter)) } + /** + * Creates a new DataStream that contains only the elements satisfying the given filter predicate. + */ def filter(fun: T => Boolean): DataStream[T] = { if (fun == null) { throw new NullPointerException("Filter function must not be null.") @@ -288,6 +396,71 @@ class DataStream[T](javaStream: JavaStream[T]) { this.filter(filter) } - def print() = javaStream.print() + /** + * Writes a DataStream to the standard output stream (stdout). For each + * element of the DataStream the result of .toString is + * written. + * + */ + def print(): DataStream[T] = new DataStream[T](javaStream.print()) + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of .toString + * is written. + * + */ + def writeAsText(path: String, millis: Long): DataStream[T] = new DataStream[T](javaStream.writeAsText(path, millis)) + + /** + * Writes a DataStream to the file specified by path in text format. + * For every element of the DataStream the result of .toString + * is written. + * + */ + def writeAsText(path: String): DataStream[T] = new DataStream[T](javaStream.writeAsText(path)) + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of .toString + * is written. + * + */ + def writeAsCsv(path: String, millis: Long): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path, millis)) + + /** + * Writes a DataStream to the file specified by path in text format. + * For every element of the DataStream the result of .toString + * is written. + * + */ + def writeAsCsv(path: String): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path)) + + /** + * Adds the given sink to this DataStream. Only streams with sinks added + * will be executed once the StreamExecutionEnvironment.execute(...) + * method is called. + * + */ + def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = new DataStream[T](javaStream.addSink(sinkFuntion)) + + /** + * Adds the given sink to this DataStream. Only streams with sinks added + * will be executed once the StreamExecutionEnvironment.execute(...) + * method is called. + * + */ + def addSink(fun: T => Unit): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Sink function must not be null.") + } + val sinkFunction = new SinkFunction[T] { + val cleanFun = clean(fun) + def invoke(in: T) = cleanFun(in) + } + this.addSink(sinkFunction) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala index df6c561..e4a7b48 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala @@ -25,6 +25,7 @@ import scala.reflect.ClassTag import org.apache.flink.streaming.api.datastream.DataStreamSource import org.apache.flink.streaming.api.invokable.SourceInvokable import org.apache.flink.streaming.api.function.source.FromElementsFunction +import org.apache.flink.streaming.api.function.source.SourceFunction class StreamExecutionEnvironment(javaEnv: JavaEnv) { @@ -44,10 +45,78 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism + /** + * Sets the maximum time frequency (milliseconds) for the flushing of the + * output buffers. By default the output buffers flush frequently to provide + * low latency and to aid smooth developer experience. Setting the parameter + * can result in three logical modes: + * + * <ul> + * <li> + * A positive integer triggers flushing periodically by that integer</li> + * <li> + * 0 triggers flushing after every record thus minimizing latency</li> + * <li> + * -1 triggers flushing only when the output buffer is full thus maximizing + * throughput</li> + * </ul> + * + */ + def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = { + javaEnv.setBufferTimeout(timeoutMillis) + this + } + + /** + * Gets the default buffer timeout set for this environment + */ + def getBufferTimout: Long = javaEnv.getBufferTimeout() + + /** + * Creates a DataStream that represents the Strings produced by reading the + * given file line wise. The file will be read with the system's default + * character set. + * + */ + def readTextFile(filePath: String): DataStream[String] = + new DataStream[String](javaEnv.readTextFile(filePath)) + + /** + * Creates a DataStream that represents the Strings produced by reading the + * given file line wise multiple times(infinite). The file will be read with + * the system's default character set. This functionality can be used for + * testing a topology. + * + */ + def readTextStream(StreamPath: String): DataStream[String] = + new DataStream[String](javaEnv.readTextStream(StreamPath)) + + /** + * Creates a new DataStream that contains the strings received infinitely + * from socket. Received strings are decoded by the system's default + * character set. + * + */ + def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] = + new DataStream[String](javaEnv.socketTextStream(hostname, port, delimiter)) + + /** + * Creates a new DataStream that contains the strings received infinitely + * from socket. Received strings are decoded by the system's default + * character set, uses '\n' as delimiter. + * + */ + def socketTextStream(hostname: String, port: Int): DataStream[String] = + new DataStream[String](javaEnv.socketTextStream(hostname, port)) + + /** + * Creates a new DataStream that contains a sequence of numbers. + * + */ def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new DataStream(javaEnv.generateSequence(from, to)) /** - * Creates a new data stream that contains the given elements. The elements must all be of the + * Creates a DataStream that contains the given elements. The elements must all be of the * same type and must be serializable. * * * Note that this operation will result in a non-parallel data source, i.e. a data source with @@ -78,8 +147,38 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { new DataStream(returnStream) } + /** + * Create a DataStream using a user defined source function for arbitrary + * source functionality. + * + */ + def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = { + Validate.notNull(function, "Function must not be null.") + val typeInfo = implicitly[TypeInformation[T]] + new DataStream[T](javaEnv.addSource(function, typeInfo)) + } + + /** + * Triggers the program execution. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * <p> + * The program execution will be logged and displayed with a generated + * default name. + * + */ def execute() = javaEnv.execute() + /** + * Triggers the program execution. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * <p> + * The program execution will be logged and displayed with the provided name + * + */ + def execute(jobName: String) = javaEnv.execute(jobName) + } object StreamExecutionEnvironment { @@ -108,7 +207,7 @@ object StreamExecutionEnvironment { * Creates a remote execution environment. The remote environment sends (parts of) the program to * a cluster for execution. Note that all file paths used in the program must be accessible from * the cluster. The execution will use the cluster's default degree of parallelism, unless the - * parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]]. + * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]]. * * @param host The host name or address of the master (JobManager), * where the program should be executed.
