Harmonize generic parameter names in Stream API classes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86c45bfa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86c45bfa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86c45bfa Branch: refs/heads/master Commit: 86c45bfa2c760ca99741fa866f777730514c7986 Parents: 05d2138 Author: Aljoscha Krettek <[email protected]> Authored: Thu Sep 24 16:40:46 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Sep 28 17:04:16 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/GroupedDataStream.java | 68 +++--- .../api/datastream/KeyedDataStream.java | 18 +- .../api/datastream/KeyedWindowDataStream.java | 22 +- .../api/datastream/WindowedDataStream.java | 239 +++++++++---------- 4 files changed, 173 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 50bf341..fde5a6d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce; * partitioned by the given {@link KeySelector}. Operators like {@link #reduce}, * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to * get additional functionality by the grouping. - * - * @param <OUT> - * The output type of the {@link GroupedDataStream}. + * + * @param <T> The type of the elements in the Grouped Stream. + * @param <KEY> The type of the key in the Keyed Stream. */ -public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { +public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> { /** * Creates a new {@link GroupedDataStream}, group inclusion is determined using @@ -48,7 +48,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * @param dataStream Base stream of data * @param keySelector Function for determining group inclusion */ - public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySelector) { + public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) { super(dataStream, keySelector); } @@ -64,8 +64,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * element of the input values with the same key. * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) { - return transform("Grouped Reduce", getType(), new StreamGroupedReduce<OUT>( + public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) { + return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>( clean(reducer), keySelector)); } @@ -82,12 +82,12 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * The initialValue passed to the folders for each key. * @return The transformed DataStream. */ - public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) { + public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) { TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(), Utils.getCallLocationName(), true); - return transform("Grouped Fold", outType, new StreamGroupedFold<OUT, R>(clean(folder), + return transform("Grouped Fold", outType, new StreamGroupedFold<T, R>(clean(folder), keySelector, initialValue)); } @@ -100,8 +100,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * The position in the data point to sum * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) { - return aggregate(new SumAggregator<OUT>(positionToSum, getType(), getExecutionConfig())); + public SingleOutputStreamOperator<T, ?> sum(int positionToSum) { + return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig())); } /** @@ -117,8 +117,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * applied. * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> sum(String field) { - return aggregate(new SumAggregator<OUT>(field, getType(), getExecutionConfig())); + public SingleOutputStreamOperator<T, ?> sum(String field) { + return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig())); } /** @@ -130,8 +130,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * The position in the data point to minimize * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) { - return aggregate(new ComparableAggregator<OUT>(positionToMin, getType(), AggregationType.MIN, + public SingleOutputStreamOperator<T, ?> min(int positionToMin) { + return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN, getExecutionConfig())); } @@ -148,8 +148,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * applied. * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> min(String field) { - return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MIN, + public SingleOutputStreamOperator<T, ?> min(String field) { + return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN, false, getExecutionConfig())); } @@ -162,8 +162,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * The position in the data point to maximize * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) { - return aggregate(new ComparableAggregator<OUT>(positionToMax, getType(), AggregationType.MAX, + public SingleOutputStreamOperator<T, ?> max(int positionToMax) { + return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX, getExecutionConfig())); } @@ -180,8 +180,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * applied. * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> max(String field) { - return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAX, + public SingleOutputStreamOperator<T, ?> max(String field) { + return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX, false, getExecutionConfig())); } @@ -202,7 +202,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * @return The transformed DataStream. */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) { + public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) { return aggregate(new ComparableAggregator(field, getType(), AggregationType.MINBY, first, getExecutionConfig())); } @@ -223,8 +223,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * be returned * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) { - return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAXBY, + public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) { + return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY, first, getExecutionConfig())); } @@ -238,7 +238,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * The position in the data point to minimize * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) { + public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -252,7 +252,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * The position in the data point to minimize * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) { + public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -270,8 +270,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * minimal value, otherwise returns the last * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) { - return aggregate(new ComparableAggregator<OUT>(positionToMinBy, getType(), AggregationType.MINBY, first, + public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) { + return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first, getExecutionConfig())); } @@ -285,7 +285,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * The position in the data point to maximize * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) { + public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -299,7 +299,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * The position in the data point to maximize * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) { + public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -317,13 +317,13 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { * maximum value, otherwise returns the last * @return The transformed DataStream. */ - public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) { - return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, getType(), AggregationType.MAXBY, first, + public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) { + return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first, getExecutionConfig())); } - protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) { - StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate), keySelector); + protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) { + StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector); return transform("Grouped Aggregation", getType(), operator); } } http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java index a32cf53..611953e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java @@ -34,11 +34,11 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; * * * @param <T> The type of the elements in the Keyed Stream. - * @param <K> The type of the key in the Keyed Stream. + * @param <KEY> The type of the key in the Keyed Stream. */ -public class KeyedDataStream<T, K> extends DataStream<T> { +public class KeyedDataStream<T, KEY> extends DataStream<T> { - protected final KeySelector<T, K> keySelector; + protected final KeySelector<T, KEY> keySelector; /** * Creates a new {@link KeyedDataStream} using the given {@link KeySelector} @@ -49,13 +49,13 @@ public class KeyedDataStream<T, K> extends DataStream<T> { * @param keySelector * Function for determining state partitions */ - public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) { + public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) { super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector))); this.keySelector = keySelector; } - public KeySelector<T, K> getKeySelector() { + public KeySelector<T, KEY> getKeySelector() { return this.keySelector; } @@ -98,8 +98,8 @@ public class KeyedDataStream<T, K> extends DataStream<T> { * @param policy The policy that defines the window. * @return The windows data stream. */ - public KeyedWindowDataStream<T, K> window(WindowPolicy policy) { - return new KeyedWindowDataStream<T, K>(this, policy); + public KeyedWindowDataStream<T, KEY> window(WindowPolicy policy) { + return new KeyedWindowDataStream<T, KEY>(this, policy); } /** @@ -112,7 +112,7 @@ public class KeyedDataStream<T, K> extends DataStream<T> { * @param slide The additional policy defining the slide of the window. * @return The windows data stream. */ - public KeyedWindowDataStream<T, K> window(WindowPolicy window, WindowPolicy slide) { - return new KeyedWindowDataStream<T, K>(this, window, slide); + public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) { + return new KeyedWindowDataStream<T, KEY>(this, window, slide); } } http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java index 37151d7..711a959 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java @@ -43,13 +43,13 @@ import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator; * KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation * over the window into one single operation. * - * @param <Type> The type of elements in the stream. - * @param <Key> The type of the key by which elements are grouped. + * @param <T> The type of elements in the stream. + * @param <K> The type of the key by which elements are grouped. */ -public class KeyedWindowDataStream<Type, Key> { +public class KeyedWindowDataStream<T, K> { /** The keyed data stream that is windowed by this stream */ - private final KeyedDataStream<Type, Key> input; + private final KeyedDataStream<T, K> input; /** The core window policy */ private final WindowPolicy windowPolicy; @@ -58,11 +58,11 @@ public class KeyedWindowDataStream<Type, Key> { private final WindowPolicy slidePolicy; - public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, WindowPolicy windowPolicy) { + public KeyedWindowDataStream(KeyedDataStream<T, K> input, WindowPolicy windowPolicy) { this(input, windowPolicy, null); } - public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, + public KeyedWindowDataStream(KeyedDataStream<T, K> input, WindowPolicy windowPolicy, WindowPolicy slidePolicy) { TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic(); @@ -91,7 +91,7 @@ public class KeyedWindowDataStream<Type, Key> { * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ - public DataStream<Type> reduceWindow(ReduceFunction<Type> function) { + public DataStream<T> reduceWindow(ReduceFunction<T> function) { String callLocation = Utils.getCallLocationName(); return createWindowOperator(function, input.getType(), "Reduce at " + callLocation); } @@ -107,10 +107,10 @@ public class KeyedWindowDataStream<Type, Key> { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, Result, Key> function) { + public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, Result, K> function) { String callLocation = Utils.getCallLocationName(); - TypeInformation<Type> inType = input.getType(); + TypeInformation<T> inType = input.getType(); TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType( function, KeyedWindowFunction.class, true, true, inType, null, false); @@ -125,9 +125,9 @@ public class KeyedWindowDataStream<Type, Key> { Function function, TypeInformation<Result> resultType, String functionName) { String opName = windowPolicy.toString(slidePolicy) + " of " + functionName; - KeySelector<Type, Key> keySel = input.getKeySelector(); + KeySelector<T, K> keySel = input.getKeySelector(); - OneInputStreamOperator<Type, Result> operator = + OneInputStreamOperator<T, Result> operator = PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel); return input.transform(opName, resultType, operator); http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 1226adf..ef6f53b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -80,46 +80,45 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil; * can be applied to the windows. The results of these transformations are also * WindowedDataStreams of the same discretisation unit. * - * @param <OUT> - * The output type of the {@link WindowedDataStream} + * @param <T> The output type of the {@link WindowedDataStream} */ -public class WindowedDataStream<OUT> { +public class WindowedDataStream<T> { - protected DataStream<OUT> dataStream; + protected DataStream<T> dataStream; protected boolean isLocal = false; - protected KeySelector<OUT, ?> discretizerKey; - protected KeySelector<OUT, ?> groupByKey; + protected KeySelector<T, ?> discretizerKey; + protected KeySelector<T, ?> groupByKey; - protected WindowingHelper<OUT> triggerHelper; - protected WindowingHelper<OUT> evictionHelper; + protected WindowingHelper<T> triggerHelper; + protected WindowingHelper<T> evictionHelper; - protected TriggerPolicy<OUT> userTrigger; - protected EvictionPolicy<OUT> userEvicter; + protected TriggerPolicy<T> userTrigger; + protected EvictionPolicy<T> userEvicter; - protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT> policyHelper) { + protected WindowedDataStream(DataStream<T> dataStream, WindowingHelper<T> policyHelper) { this.dataStream = dataStream; this.triggerHelper = policyHelper; if (dataStream instanceof GroupedDataStream) { - this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector; + this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector; } } - protected WindowedDataStream(DataStream<OUT> dataStream, TriggerPolicy<OUT> trigger, - EvictionPolicy<OUT> evicter) { + protected WindowedDataStream(DataStream<T> dataStream, TriggerPolicy<T> trigger, + EvictionPolicy<T> evicter) { this.dataStream = dataStream; this.userTrigger = trigger; this.userEvicter = evicter; if (dataStream instanceof GroupedDataStream) { - this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector; + this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector; } } - protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) { + protected WindowedDataStream(WindowedDataStream<T> windowedDataStream) { this.dataStream = windowedDataStream.dataStream; this.discretizerKey = windowedDataStream.discretizerKey; this.groupByKey = windowedDataStream.groupByKey; @@ -148,9 +147,9 @@ public class WindowedDataStream<OUT> { * @return The windowed data stream with triggering set */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public WindowedDataStream<OUT> every(WindowingHelper policyHelper) { + public WindowedDataStream<T> every(WindowingHelper policyHelper) { policyHelper.setExecutionConfig(getExecutionConfig()); - WindowedDataStream<OUT> ret = this.copy(); + WindowedDataStream<T> ret = this.copy(); if (ret.evictionHelper == null) { ret.evictionHelper = ret.triggerHelper; ret.triggerHelper = policyHelper; @@ -171,11 +170,11 @@ public class WindowedDataStream<OUT> { * The position of the fields to group by. * @return The grouped {@link WindowedDataStream} */ - public WindowedDataStream<OUT> groupBy(int... fields) { + public WindowedDataStream<T> groupBy(int... fields) { if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { - return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields)); + return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields)); } else { - return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType())); + return groupBy(new Keys.ExpressionKeys<T>(fields, getType())); } } @@ -195,8 +194,8 @@ public class WindowedDataStream<OUT> { * The fields to group by * @return The grouped {@link WindowedDataStream} */ - public WindowedDataStream<OUT> groupBy(String... fields) { - return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType())); + public WindowedDataStream<T> groupBy(String... fields) { + return groupBy(new Keys.ExpressionKeys<T>(fields, getType())); } /** @@ -211,13 +210,13 @@ public class WindowedDataStream<OUT> { * The keySelector used to extract the key for grouping. * @return The grouped {@link WindowedDataStream} */ - public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) { - WindowedDataStream<OUT> ret = this.copy(); + public WindowedDataStream<T> groupBy(KeySelector<T, ?> keySelector) { + WindowedDataStream<T> ret = this.copy(); ret.groupByKey = keySelector; return ret; } - private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) { + private WindowedDataStream<T> groupBy(Keys<T> keys) { return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); } @@ -228,8 +227,8 @@ public class WindowedDataStream<OUT> { * * @return The WindowedDataStream with local discretisation */ - public WindowedDataStream<OUT> local() { - WindowedDataStream<OUT> out = copy(); + public WindowedDataStream<T> local() { + WindowedDataStream<T> out = copy(); out.isLocal = true; return out; } @@ -241,11 +240,11 @@ public class WindowedDataStream<OUT> { * * @return The discretised stream */ - public DataStream<StreamWindow<OUT>> getDiscretizedStream() { + public DataStream<StreamWindow<T>> getDiscretizedStream() { if (getEviction() instanceof KeepAllEvictionPolicy) { throw new RuntimeException("Cannot get discretized stream for full stream window"); } - return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>()) + return discretize(WindowTransformation.NONE, new BasicWindowBuffer<T>()) .getDiscretizedStream(); } @@ -255,7 +254,7 @@ public class WindowedDataStream<OUT> { * * @return The data stream consisting of the individual records. */ - public DataStream<OUT> flatten() { + public DataStream<T> flatten() { return dataStream; } @@ -269,7 +268,7 @@ public class WindowedDataStream<OUT> { * The reduce function that will be applied to the windows. * @return The transformed DataStream */ - public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) { + public DiscretizedStream<T> reduceWindow(ReduceFunction<T> reduceFunction) { // We check whether we should apply parallel time discretization, which // is a more complex exploiting the monotonic properties of time @@ -281,9 +280,9 @@ public class WindowedDataStream<OUT> { WindowTransformation transformation = WindowTransformation.REDUCEWINDOW .with(clean(reduceFunction)); - WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation); + WindowBuffer<T> windowBuffer = getWindowBuffer(transformation); - DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer); + DiscretizedStream<T> discretized = discretize(transformation, windowBuffer); if (windowBuffer instanceof PreAggregator) { return discretized; @@ -310,11 +309,11 @@ public class WindowedDataStream<OUT> { * The output type of the operator * @return The transformed DataStream */ - public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction, + public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction, TypeInformation<R> outType) { return discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)), - new BasicWindowBuffer<OUT>()).foldWindow(initialValue, foldFunction, outType); + new BasicWindowBuffer<T>()).foldWindow(initialValue, foldFunction, outType); } @@ -330,7 +329,7 @@ public class WindowedDataStream<OUT> { * Initial value given to foldFunction * @return The transformed DataStream */ - public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction) { + public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction) { TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(foldFunction), getType()); @@ -350,7 +349,7 @@ public class WindowedDataStream<OUT> { * The function that will be applied to the windows. * @return The transformed DataStream */ - public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) { + public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction) { return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)), getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction); } @@ -373,7 +372,7 @@ public class WindowedDataStream<OUT> { * The output type of the operator. * @return The transformed DataStream */ - public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction, + public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction, TypeInformation<R> outType) { return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction), @@ -381,24 +380,24 @@ public class WindowedDataStream<OUT> { outType); } - private DiscretizedStream<OUT> discretize(WindowTransformation transformation, - WindowBuffer<OUT> windowBuffer) { + private DiscretizedStream<T> discretize(WindowTransformation transformation, + WindowBuffer<T> windowBuffer) { - OneInputStreamOperator<OUT, WindowEvent<OUT>> discretizer = getDiscretizer(); + OneInputStreamOperator<T, WindowEvent<T>> discretizer = getDiscretizer(); - OneInputStreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> bufferOperator = getBufferOperator(windowBuffer); + OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> bufferOperator = getBufferOperator(windowBuffer); @SuppressWarnings({ "unchecked", "rawtypes" }) - TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class, + TypeInformation<WindowEvent<T>> bufferEventType = new TupleTypeInfo(WindowEvent.class, getType(), BasicTypeInfo.INT_TYPE_INFO); int parallelism = getDiscretizerParallelism(transformation); - return new DiscretizedStream<OUT>(dataStream + return new DiscretizedStream<T>(dataStream .transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer) .setParallelism(parallelism) .transform(windowBuffer.getClass().getSimpleName(), - new StreamWindowTypeInfo<OUT>(getType()), bufferOperator) + new StreamWindowTypeInfo<T>(getType()), bufferOperator) .setParallelism(parallelism), groupByKey, transformation, false); } @@ -430,7 +429,7 @@ public class WindowedDataStream<OUT> { * Reduce function to apply * @return The transformed stream */ - protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) { + protected DiscretizedStream<T> timeReduce(ReduceFunction<T> reduceFunction) { WindowTransformation transformation = WindowTransformation.REDUCEWINDOW .with(clean(reduceFunction)); @@ -438,7 +437,7 @@ public class WindowedDataStream<OUT> { // We get the windowbuffer and set it to emit empty windows with // sequential IDs. This logic is necessary to merge windows created in // parallel. - WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID(); + WindowBuffer<T> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID(); // If there is a groupby for the reduce operation we apply it before the // discretizers, because we will forward everything afterwards to @@ -449,7 +448,7 @@ public class WindowedDataStream<OUT> { // We discretize the stream and call the timeReduce function of the // discretized stream, we also pass the type of the windowbuffer - DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer); + DiscretizedStream<T> discretized = discretize(transformation, windowBuffer); if (getEviction() instanceof KeepAllEvictionPolicy && !(windowBuffer instanceof PreAggregator)) { @@ -464,27 +463,27 @@ public class WindowedDataStream<OUT> { /** * Based on the defined policies, returns the stream discretizer to be used */ - private OneInputStreamOperator<OUT, WindowEvent<OUT>> getDiscretizer() { + private OneInputStreamOperator<T, WindowEvent<T>> getDiscretizer() { if (discretizerKey == null) { - return new StreamDiscretizer<OUT>(getTrigger(), getEviction()); + return new StreamDiscretizer<T>(getTrigger(), getEviction()); } else if (getTrigger() instanceof CentralActiveTrigger) { - return new GroupedActiveDiscretizer<OUT>(discretizerKey, - (CentralActiveTrigger<OUT>) getTrigger(), - (CloneableEvictionPolicy<OUT>) getEviction()); + return new GroupedActiveDiscretizer<T>(discretizerKey, + (CentralActiveTrigger<T>) getTrigger(), + (CloneableEvictionPolicy<T>) getEviction()); } else { - return new GroupedStreamDiscretizer<OUT>(discretizerKey, - (CloneableTriggerPolicy<OUT>) getTrigger(), - (CloneableEvictionPolicy<OUT>) getEviction()); + return new GroupedStreamDiscretizer<T>(discretizerKey, + (CloneableTriggerPolicy<T>) getTrigger(), + (CloneableEvictionPolicy<T>) getEviction()); } } - private OneInputStreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> getBufferOperator( - WindowBuffer<OUT> windowBuffer) { + private OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> getBufferOperator( + WindowBuffer<T> windowBuffer) { if (discretizerKey == null) { - return new StreamWindowBuffer<OUT>(windowBuffer); + return new StreamWindowBuffer<T>(windowBuffer); } else { - return new GroupedWindowBuffer<OUT>(windowBuffer, discretizerKey); + return new GroupedWindowBuffer<T>(windowBuffer, discretizerKey); } } @@ -496,43 +495,43 @@ public class WindowedDataStream<OUT> { * */ @SuppressWarnings("unchecked") - private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) { - TriggerPolicy<OUT> trigger = getTrigger(); - EvictionPolicy<OUT> eviction = getEviction(); + private WindowBuffer<T> getWindowBuffer(WindowTransformation transformation) { + TriggerPolicy<T> trigger = getTrigger(); + EvictionPolicy<T> eviction = getEviction(); if (transformation == WindowTransformation.REDUCEWINDOW) { if (WindowUtils.isTumblingPolicy(trigger, eviction)) { if (eviction instanceof KeepAllEvictionPolicy) { if (groupByKey == null) { - return new TumblingPreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), getType() + return new TumblingPreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), getType() .createSerializer(getExecutionConfig())).noEvict(); } else { - return new TumblingGroupedPreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, + return new TumblingGroupedPreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), groupByKey, getType().createSerializer(getExecutionConfig())).noEvict(); } } else { if (groupByKey == null) { - return new TumblingPreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), getType() + return new TumblingPreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), getType() .createSerializer(getExecutionConfig())); } else { - return new TumblingGroupedPreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, + return new TumblingGroupedPreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), groupByKey, getType().createSerializer(getExecutionConfig())); } } } else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) { if (groupByKey == null) { - return new SlidingCountPreReducer<OUT>( - clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream + return new SlidingCountPreReducer<T>( + clean((ReduceFunction<T>) transformation.getUDF()), dataStream .getType().createSerializer(getExecutionConfig()), WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger), ((CountTriggerPolicy<?>) trigger).getStart()); } else { - return new SlidingCountGroupedPreReducer<OUT>( - clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream + return new SlidingCountGroupedPreReducer<T>( + clean((ReduceFunction<T>) transformation.getUDF()), dataStream .getType().createSerializer(getExecutionConfig()), groupByKey, WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger), ((CountTriggerPolicy<?>) trigger).getStart()); @@ -540,14 +539,14 @@ public class WindowedDataStream<OUT> { } else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) { if (groupByKey == null) { - return new SlidingTimePreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType() + return new SlidingTimePreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), dataStream.getType() .createSerializer(getExecutionConfig()), WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger), WindowUtils.getTimeStampWrapper(trigger)); } else { - return new SlidingTimeGroupedPreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType() + return new SlidingTimeGroupedPreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), dataStream.getType() .createSerializer(getExecutionConfig()), groupByKey, WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger), WindowUtils.getTimeStampWrapper(trigger)); @@ -555,26 +554,26 @@ public class WindowedDataStream<OUT> { } else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) { if (groupByKey == null) { - return new JumpingCountPreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), getType() + return new JumpingCountPreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), getType() .createSerializer(getExecutionConfig()), WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction)); } else { - return new JumpingCountGroupedPreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType() + return new JumpingCountGroupedPreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), groupByKey, getType() .createSerializer(getExecutionConfig()), WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction)); } } else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) { if (groupByKey == null) { - return new JumpingTimePreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), getType() + return new JumpingTimePreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), getType() .createSerializer(getExecutionConfig()), WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction), WindowUtils.getTimeStampWrapper(trigger)); } else { - return new JumpingTimeGroupedPreReducer<OUT>( - (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType() + return new JumpingTimeGroupedPreReducer<T>( + (ReduceFunction<T>) transformation.getUDF(), groupByKey, getType() .createSerializer(getExecutionConfig()), WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction), WindowUtils.getTimeStampWrapper(trigger)); @@ -586,7 +585,7 @@ public class WindowedDataStream<OUT> { throw new RuntimeException( "Full stream policy can only be used with operations that support preaggregations, such as reduce or aggregations"); } else { - return new BasicWindowBuffer<OUT>(); + return new BasicWindowBuffer<T>(); } } @@ -598,8 +597,8 @@ public class WindowedDataStream<OUT> { * The position in the tuple/array to sum * @return The transformed DataStream. */ - public WindowedDataStream<OUT> sum(int positionToSum) { - return aggregate(new SumAggregator<OUT>(positionToSum, getType(), getExecutionConfig())); + public WindowedDataStream<T> sum(int positionToSum) { + return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig())); } /** @@ -613,8 +612,8 @@ public class WindowedDataStream<OUT> { * The field to sum * @return The transformed DataStream. */ - public WindowedDataStream<OUT> sum(String field) { - return aggregate(new SumAggregator<OUT>(field, getType(), getExecutionConfig())); + public WindowedDataStream<T> sum(String field) { + return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig())); } /** @@ -625,8 +624,8 @@ public class WindowedDataStream<OUT> { * The position to minimize * @return The transformed DataStream. */ - public WindowedDataStream<OUT> min(int positionToMin) { - return aggregate(new ComparableAggregator<OUT>(positionToMin, getType(), AggregationType.MIN, + public WindowedDataStream<T> min(int positionToMin) { + return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN, getExecutionConfig())); } @@ -642,8 +641,8 @@ public class WindowedDataStream<OUT> { * applied. * @return The transformed DataStream. */ - public WindowedDataStream<OUT> min(String field) { - return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MIN, + public WindowedDataStream<T> min(String field) { + return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN, false, getExecutionConfig())); } @@ -656,7 +655,7 @@ public class WindowedDataStream<OUT> { * The position to minimize by * @return The transformed DataStream. */ - public WindowedDataStream<OUT> minBy(int positionToMinBy) { + public WindowedDataStream<T> minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -669,7 +668,7 @@ public class WindowedDataStream<OUT> { * The position to minimize by * @return The transformed DataStream. */ - public WindowedDataStream<OUT> minBy(String positionToMinBy) { + public WindowedDataStream<T> minBy(String positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -686,8 +685,8 @@ public class WindowedDataStream<OUT> { * minimum value, otherwise returns the last * @return The transformed DataStream. */ - public WindowedDataStream<OUT> minBy(int positionToMinBy, boolean first) { - return aggregate(new ComparableAggregator<OUT>(positionToMinBy, getType(), AggregationType.MINBY, first, + public WindowedDataStream<T> minBy(int positionToMinBy, boolean first) { + return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first, getExecutionConfig())); } @@ -706,8 +705,8 @@ public class WindowedDataStream<OUT> { * be returned * @return The transformed DataStream. */ - public WindowedDataStream<OUT> minBy(String field, boolean first) { - return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MINBY, + public WindowedDataStream<T> minBy(String field, boolean first) { + return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MINBY, first, getExecutionConfig())); } @@ -719,8 +718,8 @@ public class WindowedDataStream<OUT> { * The position to maximize * @return The transformed DataStream. */ - public WindowedDataStream<OUT> max(int positionToMax) { - return aggregate(new ComparableAggregator<OUT>(positionToMax, getType(), AggregationType.MAX, + public WindowedDataStream<T> max(int positionToMax) { + return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX, getExecutionConfig())); } @@ -736,8 +735,8 @@ public class WindowedDataStream<OUT> { * applied. * @return The transformed DataStream. */ - public WindowedDataStream<OUT> max(String field) { - return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAX, + public WindowedDataStream<T> max(String field) { + return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX, false, getExecutionConfig())); } @@ -750,7 +749,7 @@ public class WindowedDataStream<OUT> { * The position to maximize by * @return The transformed DataStream. */ - public WindowedDataStream<OUT> maxBy(int positionToMaxBy) { + public WindowedDataStream<T> maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -763,7 +762,7 @@ public class WindowedDataStream<OUT> { * The position to maximize by * @return The transformed DataStream. */ - public WindowedDataStream<OUT> maxBy(String positionToMaxBy) { + public WindowedDataStream<T> maxBy(String positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -780,8 +779,8 @@ public class WindowedDataStream<OUT> { * maximum value, otherwise returns the last * @return The transformed DataStream. */ - public WindowedDataStream<OUT> maxBy(int positionToMaxBy, boolean first) { - return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, getType(), AggregationType.MAXBY, first, + public WindowedDataStream<T> maxBy(int positionToMaxBy, boolean first) { + return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first, getExecutionConfig())); } @@ -800,16 +799,16 @@ public class WindowedDataStream<OUT> { * be returned * @return The transformed DataStream. */ - public WindowedDataStream<OUT> maxBy(String field, boolean first) { - return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAXBY, first, + public WindowedDataStream<T> maxBy(String field, boolean first) { + return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY, first, getExecutionConfig())); } - private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregator) { + private WindowedDataStream<T> aggregate(AggregationFunction<T> aggregator) { return reduceWindow(aggregator); } - protected TriggerPolicy<OUT> getTrigger() { + protected TriggerPolicy<T> getTrigger() { if (triggerHelper != null) { return triggerHelper.toTrigger(); @@ -821,7 +820,7 @@ public class WindowedDataStream<OUT> { } - protected EvictionPolicy<OUT> getEviction() { + protected EvictionPolicy<T> getEviction() { if (evictionHelper != null) { return evictionHelper.toEvict(); @@ -829,7 +828,7 @@ public class WindowedDataStream<OUT> { if (triggerHelper instanceof Time) { return triggerHelper.toEvict(); } else { - return new TumblingEvictionPolicy<OUT>(); + return new TumblingEvictionPolicy<T>(); } } else { return userEvicter; @@ -854,7 +853,7 @@ public class WindowedDataStream<OUT> { * * @return The output type. */ - public TypeInformation<OUT> getType() { + public TypeInformation<T> getType() { return dataStream.getType(); } @@ -862,7 +861,7 @@ public class WindowedDataStream<OUT> { return dataStream.getExecutionConfig(); } - protected WindowedDataStream<OUT> copy() { - return new WindowedDataStream<OUT>(this); + protected WindowedDataStream<T> copy() { + return new WindowedDataStream<T>(this); } }
