[streaming] [api-breaking] Minor DataStream cleanups

- Removed unused constructor parameter.
- Updated outdated and wrong connection javadocs.

Closes #825


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce3bc9c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce3bc9c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce3bc9c0

Branch: refs/heads/release-0.9
Commit: ce3bc9c0b85ef1c7c74e0a6ca61a9521610df781
Parents: 0edc0c8
Author: mbalassi <mbala...@apache.org>
Authored: Thu Jun 11 12:27:21 2015 +0200
Committer: mbalassi <mbala...@apache.org>
Committed: Fri Jun 12 10:05:30 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |  2 +-
 .../streaming/api/datastream/DataStream.java    | 49 +++++++++++---------
 .../api/datastream/DataStreamSink.java          |  2 +-
 .../api/datastream/DataStreamSource.java        |  2 +-
 .../datastream/SingleOutputStreamOperator.java  |  4 +-
 5 files changed, 33 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 0176277..2626d9c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -482,7 +482,7 @@ public class ConnectedDataStream<IN1, IN2> {
 
                @SuppressWarnings({ "unchecked", "rawtypes" })
                SingleOutputStreamOperator<OUT, ?> returnStream = new 
SingleOutputStreamOperator(
-                               environment, functionName, outTypeInfo, 
operator);
+                               environment, outTypeInfo, operator);
 
                dataStream1.streamGraph.addCoOperator(returnStream.getId(), 
operator, getType1(),
                                getType2(), outTypeInfo, functionName);

http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 09f8155..6964a07 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -125,12 +125,10 @@ public class DataStream<OUT> {
         * 
         * @param environment
         *            StreamExecutionEnvironment
-        * @param operatorType
-        *            The type of the operator in the component
         * @param typeInfo
         *            Type of the datastream
         */
-       public DataStream(StreamExecutionEnvironment environment, String 
operatorType, TypeInformation<OUT> typeInfo) {
+       public DataStream(StreamExecutionEnvironment environment, 
TypeInformation<OUT> typeInfo) {
                if (environment == null) {
                        throw new NullPointerException("context is null");
                }
@@ -174,7 +172,7 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Returns the ID of the {@link DataStream}.
+        * Returns the ID of the {@link DataStream} in the current {@link 
StreamExecutionEnvironment}.
         * 
         * @return ID of the DataStream
         */
@@ -420,9 +418,11 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are broadcasted to every parallel instance of the next component. 
This
-        * setting only effects the how the outputs will be distributed between 
the
-        * parallel instances of the next processing operator.
+        * are broadcasted to every parallel instance of the next component.
+        *
+        * <p>
+        * This setting only effects the how the outputs will be distributed 
between
+        * the parallel instances of the next processing operator.
         * 
         * @return The DataStream with broadcast partitioning set.
         */
@@ -432,9 +432,11 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are shuffled to the next component. This setting only effects the 
how the
-        * outputs will be distributed between the parallel instances of the 
next
-        * processing operator.
+        * are shuffled uniformly randomly to the next component.
+        *
+        * <p>
+        * This setting only effects the how the outputs will be distributed 
between
+        * the parallel instances of the next processing operator.
         * 
         * @return The DataStream with shuffle partitioning set.
         */
@@ -445,11 +447,13 @@ public class DataStream<OUT> {
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
         * are forwarded to the local subtask of the next component (whenever
-        * possible). This is the default partitioner setting. This setting only
-        * effects the how the outputs will be distributed between the parallel
-        * instances of the next processing operator.
+        * possible).
+        *
+        * <p>
+        * This setting only effects the how the outputs will be distributed 
between
+        * the parallel instances of the next processing operator.
         * 
-        * @return The DataStream with shuffle partitioning set.
+        * @return The DataStream with forward partitioning set.
         */
        public DataStream<OUT> forward() {
                return setConnectionType(new RebalancePartitioner<OUT>(true));
@@ -457,11 +461,14 @@ public class DataStream<OUT> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
-        * are distributed evenly to the next component.This setting only 
effects
-        * the how the outputs will be distributed between the parallel 
instances of
-        * the next processing operator.
+        * are distributed evenly to instances of the next component in a 
Round-robin
+        * fashion.
+        *
+        * <p>
+        * This setting only effects the how the outputs will be distributed 
between
+        * the parallel instances of the next processing operator.
         * 
-        * @return The DataStream with shuffle partitioning set.
+        * @return The DataStream with rebalance partitioning set.
         */
        public DataStream<OUT> rebalance() {
                return setConnectionType(new RebalancePartitioner<OUT>(false));
@@ -1237,7 +1244,7 @@ public class DataStream<OUT> {
 
        /**
         * Method for passing user defined operators along with the type
-        * informations that will transform the DataStream.
+        * information that will transform the DataStream.
         * 
         * @param operatorName
         *            name of the operator, for logging purposes
@@ -1254,7 +1261,7 @@ public class DataStream<OUT> {
                DataStream<OUT> inputStream = this.copy();
                @SuppressWarnings({ "unchecked", "rawtypes" })
                SingleOutputStreamOperator<R, ?> returnStream = new 
SingleOutputStreamOperator(environment,
-                               operatorName, outTypeInfo, operator);
+                               outTypeInfo, operator);
 
                streamGraph.addOperator(returnStream.getId(), operator, 
getType(), outTypeInfo,
                                operatorName);
@@ -1337,7 +1344,7 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Gets the class of the field at the given position
+        * Gets the class of the field at the given position.
         * 
         * @param pos
         *            Position of the field

http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index dc457e0..60dc367 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -31,7 +31,7 @@ public class DataStreamSink<IN> extends 
SingleOutputStreamOperator<IN, DataStrea
 
        protected DataStreamSink(StreamExecutionEnvironment environment, String 
operatorType,
                        TypeInformation<IN> outTypeInfo, 
OneInputStreamOperator<IN, ?> operator) {
-               super(environment, operatorType, outTypeInfo, operator);
+               super(environment, outTypeInfo, operator);
        }
 
        protected DataStreamSink(DataStream<IN> dataStream) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 2bb70df..0dd7701 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -34,7 +34,7 @@ public class DataStreamSource<OUT> extends 
SingleOutputStreamOperator<OUT, DataS
        public DataStreamSource(StreamExecutionEnvironment environment, String 
operatorType,
                        TypeInformation<OUT> outTypeInfo, StreamOperator<OUT> 
operator,
                        boolean isParallel, String sourceName) {
-               super(environment, operatorType, outTypeInfo, operator);
+               super(environment, outTypeInfo, operator);
 
                environment.getStreamGraph().addSource(getId(), operator, null, 
outTypeInfo,
                                sourceName);

http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 1e5b5cf..b4a99c8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -64,8 +64,8 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
        }
 
        protected SingleOutputStreamOperator(StreamExecutionEnvironment 
environment,
-                       String operatorType, TypeInformation<OUT> outTypeInfo, 
StreamOperator<?> operator) {
-               super(environment, operatorType, outTypeInfo);
+                       TypeInformation<OUT> outTypeInfo, StreamOperator<?> 
operator) {
+               super(environment, outTypeInfo);
                this.isSplit = false;
                this.operator = operator;
        }

Reply via email to