[streaming] [api-breaking] Minor DataStream cleanups - Removed unused constructor parameter. - Updated outdated and wrong connection javadocs.
Closes #825 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce3bc9c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce3bc9c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce3bc9c0 Branch: refs/heads/release-0.9 Commit: ce3bc9c0b85ef1c7c74e0a6ca61a9521610df781 Parents: 0edc0c8 Author: mbalassi <mbala...@apache.org> Authored: Thu Jun 11 12:27:21 2015 +0200 Committer: mbalassi <mbala...@apache.org> Committed: Fri Jun 12 10:05:30 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/ConnectedDataStream.java | 2 +- .../streaming/api/datastream/DataStream.java | 49 +++++++++++--------- .../api/datastream/DataStreamSink.java | 2 +- .../api/datastream/DataStreamSource.java | 2 +- .../datastream/SingleOutputStreamOperator.java | 4 +- 5 files changed, 33 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index 0176277..2626d9c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -482,7 +482,7 @@ public class ConnectedDataStream<IN1, IN2> { @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator( - environment, functionName, outTypeInfo, operator); + environment, outTypeInfo, operator); dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getType1(), getType2(), outTypeInfo, functionName); http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 09f8155..6964a07 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -125,12 +125,10 @@ public class DataStream<OUT> { * * @param environment * StreamExecutionEnvironment - * @param operatorType - * The type of the operator in the component * @param typeInfo * Type of the datastream */ - public DataStream(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> typeInfo) { + public DataStream(StreamExecutionEnvironment environment, TypeInformation<OUT> typeInfo) { if (environment == null) { throw new NullPointerException("context is null"); } @@ -174,7 +172,7 @@ public class DataStream<OUT> { } /** - * Returns the ID of the {@link DataStream}. + * Returns the ID of the {@link DataStream} in the current {@link StreamExecutionEnvironment}. * * @return ID of the DataStream */ @@ -420,9 +418,11 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are broadcasted to every parallel instance of the next component. This - * setting only effects the how the outputs will be distributed between the - * parallel instances of the next processing operator. + * are broadcasted to every parallel instance of the next component. + * + * <p> + * This setting only effects the how the outputs will be distributed between + * the parallel instances of the next processing operator. * * @return The DataStream with broadcast partitioning set. */ @@ -432,9 +432,11 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are shuffled to the next component. This setting only effects the how the - * outputs will be distributed between the parallel instances of the next - * processing operator. + * are shuffled uniformly randomly to the next component. + * + * <p> + * This setting only effects the how the outputs will be distributed between + * the parallel instances of the next processing operator. * * @return The DataStream with shuffle partitioning set. */ @@ -445,11 +447,13 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output tuples * are forwarded to the local subtask of the next component (whenever - * possible). This is the default partitioner setting. This setting only - * effects the how the outputs will be distributed between the parallel - * instances of the next processing operator. + * possible). + * + * <p> + * This setting only effects the how the outputs will be distributed between + * the parallel instances of the next processing operator. * - * @return The DataStream with shuffle partitioning set. + * @return The DataStream with forward partitioning set. */ public DataStream<OUT> forward() { return setConnectionType(new RebalancePartitioner<OUT>(true)); @@ -457,11 +461,14 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are distributed evenly to the next component.This setting only effects - * the how the outputs will be distributed between the parallel instances of - * the next processing operator. + * are distributed evenly to instances of the next component in a Round-robin + * fashion. + * + * <p> + * This setting only effects the how the outputs will be distributed between + * the parallel instances of the next processing operator. * - * @return The DataStream with shuffle partitioning set. + * @return The DataStream with rebalance partitioning set. */ public DataStream<OUT> rebalance() { return setConnectionType(new RebalancePartitioner<OUT>(false)); @@ -1237,7 +1244,7 @@ public class DataStream<OUT> { /** * Method for passing user defined operators along with the type - * informations that will transform the DataStream. + * information that will transform the DataStream. * * @param operatorName * name of the operator, for logging purposes @@ -1254,7 +1261,7 @@ public class DataStream<OUT> { DataStream<OUT> inputStream = this.copy(); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, - operatorName, outTypeInfo, operator); + outTypeInfo, operator); streamGraph.addOperator(returnStream.getId(), operator, getType(), outTypeInfo, operatorName); @@ -1337,7 +1344,7 @@ public class DataStream<OUT> { } /** - * Gets the class of the field at the given position + * Gets the class of the field at the given position. * * @param pos * Position of the field http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index dc457e0..60dc367 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -31,7 +31,7 @@ public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStrea protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeInformation<IN> outTypeInfo, OneInputStreamOperator<IN, ?> operator) { - super(environment, operatorType, outTypeInfo, operator); + super(environment, outTypeInfo, operator); } protected DataStreamSink(DataStream<IN> dataStream) { http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java index 2bb70df..0dd7701 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -34,7 +34,7 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> outTypeInfo, StreamOperator<OUT> operator, boolean isParallel, String sourceName) { - super(environment, operatorType, outTypeInfo, operator); + super(environment, outTypeInfo, operator); environment.getStreamGraph().addSource(getId(), operator, null, outTypeInfo, sourceName); http://git-wip-us.apache.org/repos/asf/flink/blob/ce3bc9c0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 1e5b5cf..b4a99c8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -64,8 +64,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato } protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, - String operatorType, TypeInformation<OUT> outTypeInfo, StreamOperator<?> operator) { - super(environment, operatorType, outTypeInfo); + TypeInformation<OUT> outTypeInfo, StreamOperator<?> operator) { + super(environment, outTypeInfo); this.isSplit = false; this.operator = operator; }