[scala] [streaming] Finished scala StreamExecutionEnvrionment functionality + 
DataStream sinks + docs


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/de06d958
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/de06d958
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/de06d958

Branch: refs/heads/master
Commit: de06d9589cfab83183458d18eeeadd4d604c8a65
Parents: c123e11
Author: Gyula Fora <[email protected]>
Authored: Sat Dec 13 01:08:08 2014 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 337 ++++---------------
 .../environment/StreamExecutionEnvironment.java |  40 ++-
 .../api/function/sink/WriteFormatAsCsv.java     |   6 +-
 .../api/function/sink/WriteSinkFunction.java    |  15 +-
 .../sink/WriteSinkFunctionByBatches.java        |  49 ---
 .../sink/WriteSinkFunctionByMillis.java         |   7 +-
 .../flink/streaming/api/WriteAsCsvTest.java     | 176 ----------
 .../flink/streaming/api/WriteAsTextTest.java    | 177 ----------
 .../flink/api/scala/streaming/DataStream.scala  | 181 +++++++++-
 .../streaming/StreamExecutionEnvironment.scala  | 103 +++++-
 10 files changed, 389 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 1cf8d72..3a3cdc4 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,9 +44,9 @@ import 
org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.WriteFormat;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
-import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
 import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -275,7 +275,9 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output is
-        * partitioned by the selected fields.
+        * partitioned by the selected fields. This setting only effects the 
how the
+        * outputs will be distributed between the parallel instances of the 
next
+        * processing operator.
         * 
         * @param fields
         *            The fields to partition by.
@@ -289,7 +291,9 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output is
-        * partitioned by the given field expressions.
+        * partitioned by the given field expressions. This setting only 
effects the
+        * how the outputs will be distributed between the parallel instances 
of the
+        * next processing operator.
         * 
         * @param fields
         *            The fields expressions to partition by.
@@ -303,7 +307,9 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output is
-        * partitioned using the given {@link KeySelector}.
+        * partitioned using the given {@link KeySelector}. This setting only
+        * effects the how the outputs will be distributed between the parallel
+        * instances of the next processing operator.
         * 
         * @param keySelector
         * @return
@@ -314,7 +320,9 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are broadcasted to every parallel instance of the next component.
+        * are broadcasted to every parallel instance of the next component. 
This
+        * setting only effects the how the outputs will be distributed between 
the
+        * parallel instances of the next processing operator.
         * 
         * @return The DataStream with broadcast partitioning set.
         */
@@ -324,7 +332,9 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are shuffled to the next component.
+        * are shuffled to the next component. This setting only effects the 
how the
+        * outputs will be distributed between the parallel instances of the 
next
+        * processing operator.
         * 
         * @return The DataStream with shuffle partitioning set.
         */
@@ -334,8 +344,10 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are forwarded to the local subtask of the next component. This is the
-        * default partitioner setting.
+        * are forwarded to the local subtask of the next component (whenever
+        * possible). This is the default partitioner setting. This setting only
+        * effects the how the outputs will be distributed between the parallel
+        * instances of the next processing operator.
         * 
         * @return The DataStream with shuffle partitioning set.
         */
@@ -345,7 +357,9 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are distributed evenly to the next component.
+        * are distributed evenly to the next component.This setting only 
effects
+        * the how the outputs will be distributed between the parallel 
instances of
+        * the next processing operator.
         * 
         * @return The DataStream with shuffle partitioning set.
         */
@@ -547,9 +561,9 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies an aggregation that that gives the sum of the pojo data 
stream at
-        * the given field expression. A field expression is either the name of 
a
-        * public field or a getter method with parentheses of the
+        * Applies an aggregation that that gives the current sum of the pojo 
data
+        * stream at the given field expression. A field expression is either 
the
+        * name of a public field or a getter method with parentheses of the
         * {@link DataStream}S underlying type. A dot can be used to drill down 
into
         * objects, as in {@code "field1.getInnerField2()" }.
         * 
@@ -563,8 +577,8 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies an aggregation that that gives the minimum of the data 
stream at
-        * the given position.
+        * Applies an aggregation that that gives the current minimum of the 
data
+        * stream at the given position.
         * 
         * @param positionToMin
         *            The position in the data point to minimize
@@ -577,9 +591,9 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies an aggregation that that gives the minimum of the pojo data
-        * stream at the given field expression. A field expression is either 
the
-        * name of a public field or a getter method with parentheses of the
+        * Applies an aggregation that that gives the current minimum of the 
pojo
+        * data stream at the given field expression. A field expression is 
either
+        * the name of a public field or a getter method with parentheses of the
         * {@link DataStream}S underlying type. A dot can be used to drill down 
into
         * objects, as in {@code "field1.getInnerField2()" }.
         * 
@@ -594,8 +608,8 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies an aggregation that gives the maximum of the data stream at 
the
-        * given position.
+        * Applies an aggregation that gives the current maximum of the data 
stream
+        * at the given position.
         * 
         * @param positionToMax
         *            The position in the data point to maximize
@@ -608,9 +622,9 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies an aggregation that that gives the maximum of the pojo data
-        * stream at the given field expression. A field expression is either 
the
-        * name of a public field or a getter method with parentheses of the
+        * Applies an aggregation that that gives the current maximum of the 
pojo
+        * data stream at the given field expression. A field expression is 
either
+        * the name of a public field or a getter method with parentheses of the
         * {@link DataStream}S underlying type. A dot can be used to drill down 
into
         * objects, as in {@code "field1.getInnerField2()" }.
         * 
@@ -625,11 +639,11 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies an aggregation that that gives the minimum element of the 
pojo
-        * data stream by the given field expression. A field expression is 
either
-        * the name of a public field or a getter method with parentheses of the
-        * {@link DataStream}S underlying type. A dot can be used to drill down 
into
-        * objects, as in {@code "field1.getInnerField2()" }.
+        * Applies an aggregation that that gives the current minimum element 
of the
+        * pojo data stream by the given field expression. A field expression is
+        * either the name of a public field or a getter method with 
parentheses of
+        * the {@link DataStream}S underlying type. A dot can be used to drill 
down
+        * into objects, as in {@code "field1.getInnerField2()" }.
         * 
         * @param field
         *            The field expression based on which the aggregation will 
be
@@ -645,11 +659,11 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies an aggregation that that gives the maximum element of the 
pojo
-        * data stream by the given field expression. A field expression is 
either
-        * the name of a public field or a getter method with parentheses of the
-        * {@link DataStream}S underlying type. A dot can be used to drill down 
into
-        * objects, as in {@code "field1.getInnerField2()" }.
+        * Applies an aggregation that that gives the current maximum element 
of the
+        * pojo data stream by the given field expression. A field expression is
+        * either the name of a public field or a getter method with 
parentheses of
+        * the {@link DataStream}S underlying type. A dot can be used to drill 
down
+        * into objects, as in {@code "field1.getInnerField2()" }.
         * 
         * @param field
         *            The field expression based on which the aggregation will 
be
@@ -677,7 +691,7 @@ public class DataStream<OUT> {
        public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
                return this.minBy(positionToMinBy, true);
        }
-       
+
        /**
         * Applies an aggregation that that gives the current element with the
         * minimum value at the given position, if more elements have the 
minimum
@@ -724,7 +738,7 @@ public class DataStream<OUT> {
        public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
                return this.maxBy(positionToMaxBy, true);
        }
-       
+
        /**
         * Applies an aggregation that that gives the current element with the
         * maximum value at the given position, if more elements have the 
maximum
@@ -759,7 +773,8 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Applies an aggregation that gives the count of the values.
+        * Creates a new DataStream containing the current number (count) of
+        * received records.
         * 
         * @return The transformed DataStream.
         */
@@ -826,9 +841,8 @@ public class DataStream<OUT> {
         * @return The closed DataStream.
         */
        public DataStreamSink<OUT> print() {
-               DataStream<OUT> inputStream = this.copy();
                PrintSinkFunction<OUT> printFunction = new 
PrintSinkFunction<OUT>();
-               DataStreamSink<OUT> returnStream = addSink(inputStream, 
printFunction, getType());
+               DataStreamSink<OUT> returnStream = addSink(printFunction);
 
                return returnStream;
        }
@@ -841,28 +855,13 @@ public class DataStream<OUT> {
         * @return The closed DataStream.
         */
        public DataStreamSink<OUT> printToErr() {
-               DataStream<OUT> inputStream = this.copy();
                PrintSinkFunction<OUT> printFunction = new 
PrintSinkFunction<OUT>(true);
-               DataStreamSink<OUT> returnStream = addSink(inputStream, 
printFunction, getType());
+               DataStreamSink<OUT> returnStream = addSink(printFunction);
 
                return returnStream;
        }
 
        /**
-        * Writes a DataStream to the file specified by path in text format. For
-        * every element of the DataStream the result of {@link 
Object#toString()}
-        * is written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * 
-        * @return The closed DataStream
-        */
-       public DataStreamSink<OUT> writeAsText(String path) {
-               return writeAsText(this, path, new WriteFormatAsText<OUT>(), 1, 
null);
-       }
-
-       /**
         * Writes a DataStream to the file specified by path in text format. The
         * writing is performed periodically, in every millis milliseconds. For
         * every element of the DataStream the result of {@link 
Object#toString()}
@@ -876,122 +875,7 @@ public class DataStream<OUT> {
         * @return The closed DataStream
         */
        public DataStreamSink<OUT> writeAsText(String path, long millis) {
-               return writeAsText(this, path, new WriteFormatAsText<OUT>(), 
millis, null);
-       }
-
-       /**
-        * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically in equally sized batches. For every
-        * element of the DataStream the result of {@link Object#toString()} is
-        * written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * @param batchSize
-        *            is the size of the batches, i.e. the number of tuples 
written
-        *            to the file at a time
-        * 
-        * @return The closed DataStream
-        */
-       public DataStreamSink<OUT> writeAsText(String path, int batchSize) {
-               return writeAsText(this, path, new WriteFormatAsText<OUT>(), 
batchSize, null);
-       }
-
-       /**
-        * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically, in every millis milliseconds. For
-        * every element of the DataStream the result of {@link 
Object#toString()}
-        * is written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * @param millis
-        *            is the file update frequency
-        * @param endTuple
-        *            is a special tuple indicating the end of the stream. If an
-        *            endTuple is caught, the last pending batch of tuples will 
be
-        *            immediately appended to the target file regardless of the
-        *            system time.
-        * 
-        * @return The closed DataStream
-        */
-       public DataStreamSink<OUT> writeAsText(String path, long millis, OUT 
endTuple) {
-               return writeAsText(this, path, new WriteFormatAsText<OUT>(), 
millis, endTuple);
-       }
-
-       /**
-        * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically in equally sized batches. For every
-        * element of the DataStream the result of {@link Object#toString()} is
-        * written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * @param batchSize
-        *            is the size of the batches, i.e. the number of tuples 
written
-        *            to the file at a time
-        * @param endTuple
-        *            is a special tuple indicating the end of the stream. If an
-        *            endTuple is caught, the last pending batch of tuples will 
be
-        *            immediately appended to the target file regardless of the
-        *            batchSize.
-        * 
-        * @return The closed DataStream
-        */
-       public DataStreamSink<OUT> writeAsText(String path, int batchSize, OUT 
endTuple) {
-               return writeAsText(this, path, new WriteFormatAsText<OUT>(), 
batchSize, endTuple);
-       }
-
-       /**
-        * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically, in every millis milliseconds. For
-        * every element of the DataStream the result of {@link 
Object#toString()}
-        * is written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * @param millis
-        *            is the file update frequency
-        * @param endTuple
-        *            is a special tuple indicating the end of the stream. If an
-        *            endTuple is caught, the last pending batch of tuples will 
be
-        *            immediately appended to the target file regardless of the
-        *            system time.
-        * 
-        * @return the data stream constructed
-        */
-       private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, 
String path,
-                       WriteFormatAsText<OUT> format, long millis, OUT 
endTuple) {
-               DataStreamSink<OUT> returnStream = addSink(inputStream, new 
WriteSinkFunctionByMillis<OUT>(
-                               path, format, millis, endTuple), 
inputStream.typeInfo);
-               return returnStream;
-       }
-
-       /**
-        * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically in equally sized batches. For every
-        * element of the DataStream the result of {@link Object#toString()} is
-        * written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * @param batchSize
-        *            is the size of the batches, i.e. the number of tuples 
written
-        *            to the file at a time
-        * @param endTuple
-        *            is a special tuple indicating the end of the stream. If an
-        *            endTuple is caught, the last pending batch of tuples will 
be
-        *            immediately appended to the target file regardless of the
-        *            batchSize.
-        * 
-        * @return the data stream constructed
-        */
-       private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, 
String path,
-                       WriteFormatAsText<OUT> format, int batchSize, OUT 
endTuple) {
-               DataStreamSink<OUT> returnStream = addSink(inputStream,
-                               new WriteSinkFunctionByBatches<OUT>(path, 
format, batchSize, endTuple),
-                               inputStream.typeInfo);
-               return returnStream;
+               return writeAsText(path, new WriteFormatAsText<OUT>(), millis);
        }
 
        /**
@@ -1004,8 +888,8 @@ public class DataStream<OUT> {
         * 
         * @return The closed DataStream
         */
-       public DataStreamSink<OUT> writeAsCsv(String path) {
-               return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 1, 
null);
+       public DataStreamSink<OUT> writeAsText(String path) {
+               return writeAsText(path, 0);
        }
 
        /**
@@ -1022,74 +906,28 @@ public class DataStream<OUT> {
         * @return The closed DataStream
         */
        public DataStreamSink<OUT> writeAsCsv(String path, long millis) {
-               return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 
millis, null);
-       }
-
-       /**
-        * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically in equally sized batches. For every
-        * element of the DataStream the result of {@link Object#toString()} is
-        * written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * @param batchSize
-        *            is the size of the batches, i.e. the number of tuples 
written
-        *            to the file at a time
-        * 
-        * @return The closed DataStream
-        */
-       public DataStreamSink<OUT> writeAsCsv(String path, int batchSize) {
-               return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 
batchSize, null);
+               if (!getType().isTupleType()) {
+                       throw new RuntimeException("Only tuple data streams can 
be written in csv format");
+               }
+               return writeAsText(path, new WriteFormatAsCsv<OUT>(), millis);
        }
 
        /**
-        * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically, in every millis milliseconds. For
+        * Writes a DataStream to the file specified by path in text format. For
         * every element of the DataStream the result of {@link 
Object#toString()}
         * is written.
         * 
         * @param path
         *            is the path to the location where the tuples are written
-        * @param millis
-        *            is the file update frequency
-        * @param endTuple
-        *            is a special tuple indicating the end of the stream. If an
-        *            endTuple is caught, the last pending batch of tuples will 
be
-        *            immediately appended to the target file regardless of the
-        *            system time.
         * 
         * @return The closed DataStream
         */
-       public DataStreamSink<OUT> writeAsCsv(String path, long millis, OUT 
endTuple) {
-               return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 
millis, endTuple);
+       public DataStreamSink<OUT> writeAsCsv(String path) {
+               return writeAsCsv(path, 0);
        }
 
        /**
         * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically in equally sized batches. For every
-        * element of the DataStream the result of {@link Object#toString()} is
-        * written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * @param batchSize
-        *            is the size of the batches, i.e. the number of tuples 
written
-        *            to the file at a time
-        * @param endTuple
-        *            is a special tuple indicating the end of the stream. If an
-        *            endTuple is caught, the last pending batch of tuples will 
be
-        *            immediately appended to the target file regardless of the
-        *            batchSize.
-        * 
-        * @return The closed DataStream
-        */
-       public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT 
endTuple) {
-               return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 
batchSize, endTuple);
-       }
-
-       /**
-        * Writes a DataStream to the file specified by path in csv format. The
         * writing is performed periodically, in every millis milliseconds. For
         * every element of the DataStream the result of {@link 
Object#toString()}
         * is written.
@@ -1098,45 +936,12 @@ public class DataStream<OUT> {
         *            is the path to the location where the tuples are written
         * @param millis
         *            is the file update frequency
-        * @param endTuple
-        *            is a special tuple indicating the end of the stream. If an
-        *            endTuple is caught, the last pending batch of tuples will 
be
-        *            immediately appended to the target file regardless of the
-        *            system time.
-        * 
-        * @return the data stream constructed
-        */
-       private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, 
String path,
-                       WriteFormatAsCsv<OUT> format, long millis, OUT 
endTuple) {
-               DataStreamSink<OUT> returnStream = addSink(inputStream, new 
WriteSinkFunctionByMillis<OUT>(
-                               path, format, millis, endTuple), 
inputStream.typeInfo);
-               return returnStream;
-       }
-
-       /**
-        * Writes a DataStream to the file specified by path in csv format. The
-        * writing is performed periodically in equally sized batches. For every
-        * element of the DataStream the result of {@link Object#toString()} is
-        * written.
-        * 
-        * @param path
-        *            is the path to the location where the tuples are written
-        * @param batchSize
-        *            is the size of the batches, i.e. the number of tuples 
written
-        *            to the file at a time
-        * @param endTuple
-        *            is a special tuple indicating the end of the stream. If an
-        *            endTuple is caught, the last pending batch of tuples will 
be
-        *            immediately appended to the target file regardless of the
-        *            batchSize.
         * 
         * @return the data stream constructed
         */
-       private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, 
String path,
-                       WriteFormatAsCsv<OUT> format, int batchSize, OUT 
endTuple) {
-               DataStreamSink<OUT> returnStream = addSink(inputStream,
-                               new WriteSinkFunctionByBatches<OUT>(path, 
format, batchSize, endTuple),
-                               inputStream.typeInfo);
+       private DataStreamSink<OUT> writeAsText(String path, WriteFormat<OUT> 
format, long millis) {
+               DataStreamSink<OUT> returnStream = addSink(new 
WriteSinkFunctionByMillis<OUT>(path, format,
+                               millis));
                return returnStream;
        }
 
@@ -1241,21 +1046,13 @@ public class DataStream<OUT> {
         * @return The closed DataStream.
         */
        public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
-               return addSink(this.copy(), sinkFunction);
-       }
-
-       private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, 
SinkFunction<OUT> sinkFunction) {
-               return addSink(inputStream, sinkFunction, getType());
-       }
 
-       private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
-                       SinkFunction<OUT> sinkFunction, TypeInformation<OUT> 
inTypeInfo) {
                DataStreamSink<OUT> returnStream = new 
DataStreamSink<OUT>(environment, "sink", typeInfo);
 
                jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SinkInvokable<OUT>(
-                               clean(sinkFunction)), inTypeInfo, null, "sink", 
degreeOfParallelism);
+                               clean(sinkFunction)), getType(), null, "sink", 
degreeOfParallelism);
 
-               inputStream.connectGraph(inputStream.copy(), 
returnStream.getId(), 0);
+               this.connectGraph(this.copy(), returnStream.getId(), 0);
 
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f50ab91..6108975 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -212,7 +212,8 @@ public abstract class StreamExecutionEnvironment {
        /**
         * Creates a DataStream that represents the Strings produced by reading 
the
         * given file line wise multiple times(infinite). The file will be read 
with
-        * the system's default character set.
+        * the system's default character set. This functionality can be used 
for
+        * testing a topology.
         * 
         * @param filePath
         *            The path of the file, as a URI (e.g.,
@@ -350,8 +351,17 @@ public abstract class StreamExecutionEnvironment {
                return addSource(new GenSequenceFunction(from, to));
        }
 
+       private DataStreamSource<String> addFileSource(InputFormat<String, ?> 
inputFormat,
+                       TypeInformation<String> typeInfo) {
+               FileSourceFunction function = new 
FileSourceFunction(inputFormat, typeInfo);
+               DataStreamSource<String> returnStream = addSource(function);
+               jobGraphBuilder.setInputFormat(returnStream.getId(), 
inputFormat);
+               return returnStream;
+       }
+
        /**
-        * Ads a data source thus opening a {@link DataStream}.
+        * Create a DataStream using a user defined source function for 
arbitrary
+        * source functionality.
         * 
         * @param function
         *            the user defined function
@@ -371,11 +381,27 @@ public abstract class StreamExecutionEnvironment {
                return returnStream;
        }
 
-       private DataStreamSource<String> addFileSource(InputFormat<String, ?> 
inputFormat,
-                       TypeInformation<String> typeInfo) {
-               FileSourceFunction function = new 
FileSourceFunction(inputFormat, typeInfo);
-               DataStreamSource<String> returnStream = addSource(function);
-               jobGraphBuilder.setInputFormat(returnStream.getId(), 
inputFormat);
+       /**
+        * Ads a data source with a custom type information thus opening a
+        * {@link DataStream}. Only in very special cases does the user need to
+        * support type information. Otherwise use
+        * {@link #addSource(SourceFunction)}
+        * 
+        * @param function
+        *            the user defined function
+        * @param outTypeInfo
+        *            the user defined type information for the stream
+        * @param <OUT>
+        *            type of the returned stream
+        * @return the data stream constructed
+        */
+       public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> 
function,
+                       TypeInformation<OUT> outTypeInfo) {
+
+               DataStreamSource<OUT> returnStream = new 
DataStreamSource<OUT>(this, "source", outTypeInfo);
+
+               jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SourceInvokable<OUT>(function),
+                               null, outTypeInfo, "source", 1);
 
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
index ebc38bb..b22fd80 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
@@ -25,7 +25,7 @@ import java.util.ArrayList;
 
 /**
  * Writes tuples in csv format.
- *
+ * 
  * @param <IN>
  *            Input tuple type
  */
@@ -37,8 +37,8 @@ public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
                try {
                        PrintWriter outStream = new PrintWriter(new 
BufferedWriter(new FileWriter(path, true)));
                        for (IN tupleToWrite : tupleList) {
-                               
outStream.println(tupleToWrite.toString().substring(1,
-                                               
tupleToWrite.toString().length() - 1));
+                               String strTuple = tupleToWrite.toString();
+                               outStream.println(strTuple.substring(1, 
strTuple.length() - 1));
                        }
                        outStream.close();
                } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
index 01298b0..0c52afc 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
@@ -35,13 +35,11 @@ public abstract class WriteSinkFunction<IN> implements 
SinkFunction<IN> {
 
        protected final String path;
        protected ArrayList<IN> tupleList = new ArrayList<IN>();
-       protected final IN endTuple;
        protected WriteFormat<IN> format;
 
-       public WriteSinkFunction(String path, WriteFormat<IN> format, IN 
endTuple) {
+       public WriteSinkFunction(String path, WriteFormat<IN> format) {
                this.path = path;
                this.format = format;
-               this.endTuple = endTuple;
                cleanFile(path);
        }
 
@@ -82,16 +80,13 @@ public abstract class WriteSinkFunction<IN> implements 
SinkFunction<IN> {
         */
        @Override
        public void invoke(IN tuple) {
-               if (!tuple.equals(endTuple)) {
-                       tupleList.add(tuple);
-                       if (updateCondition()) {
-                               format.write(path, tupleList);
-                               resetParameters();
-                       }
-               } else {
+
+               tupleList.add(tuple);
+               if (updateCondition()) {
                        format.write(path, tupleList);
                        resetParameters();
                }
+
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
deleted file mode 100644
index 5012e25..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in equally sized
- * batches.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteSinkFunctionByBatches<IN> extends WriteSinkFunction<IN> {
-       private static final long serialVersionUID = 1L;
-
-       private final int batchSize;
-
-       public WriteSinkFunctionByBatches(String path, WriteFormat<IN> format, 
int batchSize,
-                       IN endTuple) {
-               super(path, format, endTuple);
-               this.batchSize = batchSize;
-       }
-
-       @Override
-       protected boolean updateCondition() {
-               return tupleList.size() >= batchSize;
-       }
-
-       @Override
-       protected void resetParameters() {
-               tupleList.clear();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index 1fa978e..ee6df94 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -17,11 +17,10 @@
 
 package org.apache.flink.streaming.api.function.sink;
 
-
 /**
  * Implementation of WriteSinkFunction. Writes tuples to file in every millis
  * milliseconds.
- *
+ * 
  * @param <IN>
  *            Input tuple type
  */
@@ -31,8 +30,8 @@ public class WriteSinkFunctionByMillis<IN> extends 
WriteSinkFunction<IN> {
        private final long millis;
        private long lastTime;
 
-       public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, 
long millis, IN endTuple) {
-               super(path, format, endTuple);
+       public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, 
long millis) {
+               super(path, format);
                this.millis = millis;
                lastTime = System.currentTimeMillis();
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
deleted file mode 100644
index edd3ed5..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class WriteAsCsvTest {
-       
-       private static final String PREFIX = 
System.getProperty("java.io.tmpdir") + "/" + 
WriteAsCsvTest.class.getSimpleName() + "_";
-       
-       private static final long MEMORYSIZE = 32;
-
-       private static List<String> result1 = new ArrayList<String>();
-       private static List<String> result2 = new ArrayList<String>();
-       private static List<String> result3 = new ArrayList<String>();
-       private static List<String> result4 = new ArrayList<String>();
-       private static List<String> result5 = new ArrayList<String>();
-
-       private static List<String> expected1 = new ArrayList<String>();
-       private static List<String> expected2 = new ArrayList<String>();
-       private static List<String> expected3 = new ArrayList<String>();
-       private static List<String> expected4 = new ArrayList<String>();
-       private static List<String> expected5 = new ArrayList<String>();
-
-       public static final class MySource1 implements 
SourceFunction<Tuple1<Integer>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void invoke(Collector<Tuple1<Integer>> collector) throws 
Exception {
-                       for (int i = 0; i < 27; i++) {
-                               collector.collect(new Tuple1<Integer>(i));
-                       }
-               }
-       }
-
-       private static void readFile(String path, List<String> result) {
-               try {
-                       BufferedReader br = new BufferedReader(new 
FileReader(path));
-                       String line;
-                       line = br.readLine();
-                       while (line != null) {
-                               result.add(line);
-                               line = br.readLine();
-                       }
-                       br.close();
-               } catch (IOException e) {
-                       e.printStackTrace();
-               }
-       }
-
-       private static void fillExpected1() {
-               for (int i = 0; i < 27; i++) {
-                       expected1.add(i + "");
-               }
-       }
-
-       private static void fillExpected2() {
-               for (int i = 0; i < 25; i++) {
-                       expected2.add(i + "");
-               }
-       }
-
-       private static void fillExpected3() {
-               for (int i = 0; i < 20; i++) {
-                       expected3.add(i + "");
-               }
-       }
-
-       private static void fillExpected4() {
-               for (int i = 0; i < 26; i++) {
-                       expected4.add(i + "");
-               }
-       }
-
-       private static void fillExpected5() {
-               for (int i = 0; i < 14; i++) {
-                       expected5.add(i + "");
-               }
-
-               for (int i = 15; i < 25; i++) {
-                       expected5.add(i + "");
-               }
-       }
-
-       @BeforeClass
-       public static void createFileCleanup() {
-               Runnable r = new Runnable() {
-                       
-                       @Override
-                       public void run() {
-                               try { new File(PREFIX + "test1.txt").delete(); 
} catch (Throwable t) {}
-                               try { new File(PREFIX + "test2.txt").delete(); 
} catch (Throwable t) {}
-                               try { new File(PREFIX + "test3.txt").delete(); 
} catch (Throwable t) {}
-                               try { new File(PREFIX + "test4.txt").delete(); 
} catch (Throwable t) {}
-                               try { new File(PREFIX + "test5.txt").delete(); 
} catch (Throwable t) {}
-                       }
-               };
-               
-               Runtime.getRuntime().addShutdownHook(new Thread(r));
-       }
-       
-       @Test
-       public void test() throws Exception {
-               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new 
MySource1()).writeAsCsv(PREFIX + "test1.txt");
-
-               fillExpected1();
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new 
MySource1()).writeAsCsv(PREFIX + "test2.txt", 5);
-
-               fillExpected2();
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new 
MySource1()).writeAsCsv(PREFIX + "test3.txt", 10);
-
-               fillExpected3();
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new 
MySource1()).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
-
-               fillExpected4();
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new 
MySource1()).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
-
-               fillExpected5();
-
-               env.execute();
-
-               readFile(PREFIX + "test1.txt", result1);
-               readFile(PREFIX + "test2.txt", result2);
-               readFile(PREFIX + "test3.txt", result3);
-               readFile(PREFIX + "test4.txt", result4);
-               readFile(PREFIX + "test5.txt", result5);
-
-               assertTrue(expected1.equals(result1));
-               assertTrue(expected2.equals(result2));
-               assertTrue(expected3.equals(result3));
-               assertTrue(expected4.equals(result4));
-               assertTrue(expected5.equals(result5));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
deleted file mode 100644
index 1cad8a6..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class WriteAsTextTest {
-       
-       private static final String PREFIX = 
System.getProperty("java.io.tmpdir") + "/" + 
WriteAsTextTest.class.getSimpleName() + "_";
-       
-       private static final long MEMORYSIZE = 32;
-
-       private static List<String> result1 = new ArrayList<String>();
-       private static List<String> result2 = new ArrayList<String>();
-       private static List<String> result3 = new ArrayList<String>();
-       private static List<String> result4 = new ArrayList<String>();
-       private static List<String> result5 = new ArrayList<String>();
-
-       private static List<String> expected1 = new ArrayList<String>();
-       private static List<String> expected2 = new ArrayList<String>();
-       private static List<String> expected3 = new ArrayList<String>();
-       private static List<String> expected4 = new ArrayList<String>();
-       private static List<String> expected5 = new ArrayList<String>();
-
-       public static final class MySource1 implements 
SourceFunction<Tuple1<Integer>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void invoke(Collector<Tuple1<Integer>> collector) throws 
Exception {
-                       for (int i = 0; i < 27; i++) {
-                               collector.collect(new Tuple1<Integer>(i));
-                       }
-               }
-       }
-
-       private static void readFile(String path, List<String> result) {
-               try {
-                       BufferedReader br = new BufferedReader(new 
FileReader(path));
-                       String line;
-                       line = br.readLine();
-                       while (line != null) {
-                               result.add(line);
-                               line = br.readLine();
-                       }
-                       br.close();
-               } catch (IOException e) {
-                       e.printStackTrace();
-               }
-       }
-
-       private static void fillExpected1() {
-               for (int i = 0; i < 27; i++) {
-                       expected1.add("(" + i + ")");
-               }
-       }
-
-       private static void fillExpected2() {
-               for (int i = 0; i < 25; i++) {
-                       expected2.add("(" + i + ")");
-               }
-       }
-
-       private static void fillExpected3() {
-               for (int i = 0; i < 20; i++) {
-                       expected3.add("(" + i + ")");
-               }
-       }
-
-       private static void fillExpected4() {
-               for (int i = 0; i < 26; i++) {
-                       expected4.add("(" + i + ")");
-               }
-       }
-
-       private static void fillExpected5() {
-               for (int i = 0; i < 14; i++) {
-                       expected5.add("(" + i + ")");
-               }
-
-               for (int i = 15; i < 25; i++) {
-                       expected5.add("(" + i + ")");
-               }
-       }
-       
-       @BeforeClass
-       public static void createFileCleanup() {
-               Runnable r = new Runnable() {
-                       
-                       @Override
-                       public void run() {
-                               try { new File(PREFIX + "test1.txt").delete(); 
} catch (Throwable t) {}
-                               try { new File(PREFIX + "test2.txt").delete(); 
} catch (Throwable t) {}
-                               try { new File(PREFIX + "test3.txt").delete(); 
} catch (Throwable t) {}
-                               try { new File(PREFIX + "test4.txt").delete(); 
} catch (Throwable t) {}
-                               try { new File(PREFIX + "test5.txt").delete(); 
} catch (Throwable t) {}
-                       }
-               };
-               
-               Runtime.getRuntime().addShutdownHook(new Thread(r));
-       }
-
-       @Test
-       public void test() throws Exception {
-               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
-               
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new 
MySource1()).writeAsText(PREFIX + "test1.txt");
-
-               fillExpected1();
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new 
MySource1()).writeAsText(PREFIX + "test2.txt", 5);
-
-               fillExpected2();
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new 
MySource1()).writeAsText(PREFIX + "test3.txt", 10);
-
-               fillExpected3();
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new 
MySource1()).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
-
-               fillExpected4();
-
-               @SuppressWarnings("unused")
-               DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new 
MySource1()).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
-
-               fillExpected5();
-
-               env.execute();
-
-               readFile(PREFIX + "test1.txt", result1);
-               readFile(PREFIX + "test2.txt", result2);
-               readFile(PREFIX + "test3.txt", result3);
-               readFile(PREFIX + "test4.txt", result4);
-               readFile(PREFIX + "test5.txt", result5);
-
-               assertEquals(expected1,result1);
-               assertEquals(expected2,result2);
-               assertEquals(expected3,result3);
-               assertEquals(expected4,result4);
-               assertEquals(expected5,result5);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index b10bdc6..e96f5eb 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.datastream.GroupedDataStream
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.streaming.api.function.sink.SinkFunction
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -86,15 +87,36 @@ class DataStream[T](javaStream: JavaStream[T]) {
         "parallelism.")
   }
 
+  /**
+   * Creates a new DataStream by merging DataStream outputs of
+   * the same type with each other. The DataStreams merged using this operator
+   * will be transformed simultaneously.
+   *
+   */
   def merge(dataStreams: DataStream[T]*): DataStream[T] =
     new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*))
 
+  /**
+   * Groups the elements of a DataStream by the given key positions (for 
tuple/array types) to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
   def groupBy(fields: Int*): DataStream[T] =
     new DataStream[T](javaStream.groupBy(fields: _*))
 
+  /**
+   * Groups the elements of a DataStream by the given field expressions to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
   def groupBy(firstField: String, otherFields: String*): DataStream[T] =
     new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: 
_*))
 
+  /**
+   * Groups the elements of a DataStream by the given K key to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
   def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
 
     val keyExtractor = new KeySelector[T, K] {
@@ -104,12 +126,27 @@ class DataStream[T](javaStream: JavaStream[T]) {
     new DataStream[T](javaStream.groupBy(keyExtractor))
   }
 
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the selected fields. This setting only effects the how the 
outputs will be distributed between the parallel instances of the next 
processing operator.
+   *
+   */
   def partitionBy(fields: Int*): DataStream[T] =
     new DataStream[T](javaStream.partitionBy(fields: _*))
 
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the selected fields. This setting only effects the how the 
outputs will be distributed between the parallel instances of the next 
processing operator.
+   *
+   */
   def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
     new DataStream[T](javaStream.partitionBy(firstField +: 
otherFields.toArray: _*))
 
+  /**
+   * Sets the partitioning of the DataStream so that the output is
+   * partitioned by the given Key. This setting only effects the how the 
outputs will be distributed between the parallel instances of the next 
processing operator.
+   *
+   */
   def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
 
     val keyExtractor = new KeySelector[T, K] {
@@ -119,56 +156,124 @@ class DataStream[T](javaStream: JavaStream[T]) {
     new DataStream[T](javaStream.partitionBy(keyExtractor))
   }
 
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are broadcasted to every parallel instance of the next component. This
+   * setting only effects the how the outputs will be distributed between the
+   * parallel instances of the next processing operator.
+   *
+   */
   def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast())
 
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are shuffled to the next component. This setting only effects the how the
+   * outputs will be distributed between the parallel instances of the next
+   * processing operator.
+   *
+   */
   def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle())
 
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are forwarded to the local subtask of the next component (whenever
+   * possible). This is the default partitioner setting. This setting only
+   * effects the how the outputs will be distributed between the parallel
+   * instances of the next processing operator.
+   *
+   */
   def forward: DataStream[T] = new DataStream[T](javaStream.forward())
 
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are distributed evenly to the next component.This setting only effects
+   * the how the outputs will be distributed between the parallel instances of
+   * the next processing operator.
+   *
+   */
   def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
 
+  /**
+   * Applies an aggregation that that gives the current maximum of the data 
stream at
+   * the given position.
+   *
+   */
   def max(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.max(field))
     case field: String => return new DataStream[T](javaStream.max(field))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current minimum of the data 
stream at
+   * the given position.
+   *
+   */
   def min(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.min(field))
     case field: String => return new DataStream[T](javaStream.min(field))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that sums the data stream at the given position.
+   *
+   */
   def sum(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.sum(field))
     case field: String => return new DataStream[T](javaStream.sum(field))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current maximum element of the 
data stream by
+   * the given position. When equality, returns the first.
+   *
+   */
   def maxBy(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.maxBy(field))
     case field: String => return new DataStream[T](javaStream.maxBy(field))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current minimum element of the 
data stream by
+   * the given position. When equality, returns the first.
+   *
+   */
   def minBy(field: Any): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.minBy(field))
     case field: String => return new DataStream[T](javaStream.minBy(field))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current minimum element of the 
data stream by
+   * the given position. When equality, the user can set to get the first or 
last element with the minimal value.
+   *
+   */
   def minBy(field: Any, first: Boolean): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.minBy(field, first))
     case field: String => return new DataStream[T](javaStream.minBy(field, 
first))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Applies an aggregation that that gives the current maximum element of the 
data stream by
+   * the given position. When equality, the user can set to get the first or 
last element with the maximal value.
+   *
+   */
   def maxBy(field: Any, first: Boolean): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.maxBy(field, first))
     case field: String => return new DataStream[T](javaStream.maxBy(field, 
first))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
   }
 
+  /**
+   * Creates a new DataStream containing the current number (count) of
+   * received records.
+   *
+   */
   def count: DataStream[java.lang.Long] = new 
DataStream[java.lang.Long](javaStream.count())
 
   /**
@@ -239,7 +344,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new [[DataStream]] by merging the elements of this DataStream 
using an associative reduce
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream 
using an associative reduce
    * function.
    */
   def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
@@ -253,7 +358,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new [[DataStream]] by merging the elements of this DataStream 
using an associative reduce
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream 
using an associative reduce
    * function.
    */
   def reduce(fun: (T, T) => T): DataStream[T] = {
@@ -268,7 +373,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new DataSet that contains only the elements satisfying the 
given filter predicate.
+   * Creates a new DataStream that contains only the elements satisfying the 
given filter predicate.
    */
   def filter(filter: FilterFunction[T]): DataStream[T] = {
     if (filter == null) {
@@ -277,6 +382,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     new DataStream[T](javaStream.filter(filter))
   }
 
+  /**
+   * Creates a new DataStream that contains only the elements satisfying the 
given filter predicate.
+   */
   def filter(fun: T => Boolean): DataStream[T] = {
     if (fun == null) {
       throw new NullPointerException("Filter function must not be null.")
@@ -288,6 +396,71 @@ class DataStream[T](javaStream: JavaStream[T]) {
     this.filter(filter)
   }
 
-  def print() = javaStream.print()
+  /**
+   * Writes a DataStream to the standard output stream (stdout). For each
+   * element of the DataStream the result of .toString is
+   * written.
+   *
+   */
+  def print(): DataStream[T] = new DataStream[T](javaStream.print())
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsText(path: String, millis: Long): DataStream[T] = new 
DataStream[T](javaStream.writeAsText(path, millis))
+
+  /**
+   * Writes a DataStream to the file specified by path in text format.
+   * For every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsText(path: String): DataStream[T] = new 
DataStream[T](javaStream.writeAsText(path))
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsCsv(path: String, millis: Long): DataStream[T] = new 
DataStream[T](javaStream.writeAsCsv(path, millis))
+
+  /**
+   * Writes a DataStream to the file specified by path in text format.
+   * For every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsCsv(path: String): DataStream[T] = new 
DataStream[T](javaStream.writeAsCsv(path))
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   *
+   */
+  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = new 
DataStream[T](javaStream.addSink(sinkFuntion))
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   *
+   */
+  def addSink(fun: T => Unit): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Sink function must not be null.")
+    }
+    val sinkFunction = new SinkFunction[T] {
+      val cleanFun = clean(fun)
+      def invoke(in: T) = cleanFun(in)
+    }
+    this.addSink(sinkFunction)
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/de06d958/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index df6c561..e4a7b48 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -25,6 +25,7 @@ import scala.reflect.ClassTag
 import org.apache.flink.streaming.api.datastream.DataStreamSource
 import org.apache.flink.streaming.api.invokable.SourceInvokable
 import org.apache.flink.streaming.api.function.source.FromElementsFunction
+import org.apache.flink.streaming.api.function.source.SourceFunction
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -44,10 +45,78 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
 
+  /**
+   * Sets the maximum time frequency (milliseconds) for the flushing of the
+   * output buffers. By default the output buffers flush frequently to provide
+   * low latency and to aid smooth developer experience. Setting the parameter
+   * can result in three logical modes:
+   *
+   * <ul>
+   * <li>
+   * A positive integer triggers flushing periodically by that integer</li>
+   * <li>
+   * 0 triggers flushing after every record thus minimizing latency</li>
+   * <li>
+   * -1 triggers flushing only when the output buffer is full thus maximizing
+   * throughput</li>
+   * </ul>
+   *
+   */
+  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
+    javaEnv.setBufferTimeout(timeoutMillis)
+    this
+  }
+
+  /**
+   * Gets the default buffer timeout set for this environment
+   */
+  def getBufferTimout: Long = javaEnv.getBufferTimeout()
+
+  /**
+   * Creates a DataStream that represents the Strings produced by reading the
+   * given file line wise. The file will be read with the system's default
+   * character set.
+   *
+   */
+  def readTextFile(filePath: String): DataStream[String] =
+    new DataStream[String](javaEnv.readTextFile(filePath))
+
+  /**
+   * Creates a DataStream that represents the Strings produced by reading the
+   * given file line wise multiple times(infinite). The file will be read with
+   * the system's default character set. This functionality can be used for
+   * testing a topology.
+   *
+   */
+  def readTextStream(StreamPath: String): DataStream[String] =
+    new DataStream[String](javaEnv.readTextStream(StreamPath))
+
+  /**
+   * Creates a new DataStream that contains the strings received infinitely
+   * from socket. Received strings are decoded by the system's default
+   * character set.
+   *
+   */
+  def socketTextStream(hostname: String, port: Int, delimiter: Char): 
DataStream[String] =
+    new DataStream[String](javaEnv.socketTextStream(hostname, port, delimiter))
+
+  /**
+   * Creates a new DataStream that contains the strings received infinitely
+   * from socket. Received strings are decoded by the system's default
+   * character set, uses '\n' as delimiter.
+   *
+   */
+  def socketTextStream(hostname: String, port: Int): DataStream[String] =
+    new DataStream[String](javaEnv.socketTextStream(hostname, port))
+
+  /**
+   * Creates a new DataStream that contains a sequence of numbers.
+   *
+   */
   def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new 
DataStream(javaEnv.generateSequence(from, to))
 
   /**
-   * Creates a new data stream that contains the given elements. The elements 
must all be of the
+   * Creates a DataStream that contains the given elements. The elements must 
all be of the
    * same type and must be serializable.
    *
    * * Note that this operation will result in a non-parallel data source, 
i.e. a data source with
@@ -78,8 +147,38 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     new DataStream(returnStream)
   }
 
+  /**
+   * Create a DataStream using a user defined source function for arbitrary
+   * source functionality.
+   *
+   */
+  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): 
DataStream[T] = {
+    Validate.notNull(function, "Function must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+    new DataStream[T](javaEnv.addSource(function, typeInfo))
+  }
+
+  /**
+   * Triggers the program execution. The environment will execute all parts of
+   * the program that have resulted in a "sink" operation. Sink operations are
+   * for example printing results or forwarding them to a message queue.
+   * <p>
+   * The program execution will be logged and displayed with a generated
+   * default name.
+   *
+   */
   def execute() = javaEnv.execute()
 
+  /**
+   * Triggers the program execution. The environment will execute all parts of
+   * the program that have resulted in a "sink" operation. Sink operations are
+   * for example printing results or forwarding them to a message queue.
+   * <p>
+   * The program execution will be logged and displayed with the provided name
+   *
+   */
+  def execute(jobName: String) = javaEnv.execute(jobName)
+
 }
 
 object StreamExecutionEnvironment {
@@ -108,7 +207,7 @@ object StreamExecutionEnvironment {
    * Creates a remote execution environment. The remote environment sends 
(parts of) the program to
    * a cluster for execution. Note that all file paths used in the program 
must be accessible from
    * the cluster. The execution will use the cluster's default degree of 
parallelism, unless the
-   * parallelism is set explicitly via 
[[ExecutionEnvironment.setDegreeOfParallelism()]].
+   * parallelism is set explicitly via 
[[StreamExecutionEnvironment.setDegreeOfParallelism()]].
    *
    * @param host The host name or address of the master (JobManager),
    *             where the program should be executed.

Reply via email to