Harmonize generic parameter names in Stream API classes

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86c45bfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86c45bfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86c45bfa

Branch: refs/heads/master
Commit: 86c45bfa2c760ca99741fa866f777730514c7986
Parents: 05d2138
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Sep 24 16:40:46 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Sep 28 17:04:16 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/GroupedDataStream.java       |  68 +++---
 .../api/datastream/KeyedDataStream.java         |  18 +-
 .../api/datastream/KeyedWindowDataStream.java   |  22 +-
 .../api/datastream/WindowedDataStream.java      | 239 +++++++++----------
 4 files changed, 173 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 50bf341..fde5a6d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -35,11 +35,11 @@ import 
org.apache.flink.streaming.api.operators.StreamGroupedReduce;
  * partitioned by the given {@link KeySelector}. Operators like {@link 
#reduce},
  * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
  * get additional functionality by the grouping.
- * 
- * @param <OUT>
- *            The output type of the {@link GroupedDataStream}.
+ *
+ * @param <T> The type of the elements in the Grouped Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
  */
-public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
+public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {
 
        /**
         * Creates a new {@link GroupedDataStream}, group inclusion is 
determined using
@@ -48,7 +48,7 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         * @param dataStream Base stream of data
         * @param keySelector Function for determining group inclusion
         */
-       public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, 
KEY> keySelector) {
+       public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> 
keySelector) {
                super(dataStream, keySelector);
        }
 
@@ -64,8 +64,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            element of the input values with the same key.
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reducer) {
-               return transform("Grouped Reduce", getType(), new 
StreamGroupedReduce<OUT>(
+       public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> 
reducer) {
+               return transform("Grouped Reduce", getType(), new 
StreamGroupedReduce<T>(
                                clean(reducer), keySelector));
        }
 
@@ -82,12 +82,12 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            The initialValue passed to the folders for each key.
         * @return The transformed DataStream.
         */
-       public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, 
FoldFunction<OUT, R> folder) {
+       public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, 
FoldFunction<T, R> folder) {
 
                TypeInformation<R> outType = 
TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
                                Utils.getCallLocationName(), true);
 
-               return transform("Grouped Fold", outType, new 
StreamGroupedFold<OUT, R>(clean(folder),
+               return transform("Grouped Fold", outType, new 
StreamGroupedFold<T, R>(clean(folder),
                                keySelector, initialValue));
        }
 
@@ -100,8 +100,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            The position in the data point to sum
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
-               return aggregate(new SumAggregator<OUT>(positionToSum, 
getType(), getExecutionConfig()));
+       public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+               return aggregate(new SumAggregator<T>(positionToSum, getType(), 
getExecutionConfig()));
        }
 
        /**
@@ -117,8 +117,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            applied.
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-               return aggregate(new SumAggregator<OUT>(field, getType(), 
getExecutionConfig()));
+       public SingleOutputStreamOperator<T, ?> sum(String field) {
+               return aggregate(new SumAggregator<T>(field, getType(), 
getExecutionConfig()));
        }
 
        /**
@@ -130,8 +130,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            The position in the data point to minimize
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
-               return aggregate(new ComparableAggregator<OUT>(positionToMin, 
getType(), AggregationType.MIN,
+       public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+               return aggregate(new ComparableAggregator<T>(positionToMin, 
getType(), AggregationType.MIN,
                                getExecutionConfig()));
        }
 
@@ -148,8 +148,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            applied.
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> min(String field) {
-               return aggregate(new ComparableAggregator<OUT>(field, 
getType(), AggregationType.MIN,
+       public SingleOutputStreamOperator<T, ?> min(String field) {
+               return aggregate(new ComparableAggregator<T>(field, getType(), 
AggregationType.MIN,
                                false, getExecutionConfig()));
        }
 
@@ -162,8 +162,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            The position in the data point to maximize
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
-               return aggregate(new ComparableAggregator<OUT>(positionToMax, 
getType(), AggregationType.MAX,
+       public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+               return aggregate(new ComparableAggregator<T>(positionToMax, 
getType(), AggregationType.MAX,
                                getExecutionConfig()));
        }
 
@@ -180,8 +180,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            applied.
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> max(String field) {
-               return aggregate(new ComparableAggregator<OUT>(field, 
getType(), AggregationType.MAX,
+       public SingleOutputStreamOperator<T, ?> max(String field) {
+               return aggregate(new ComparableAggregator<T>(field, getType(), 
AggregationType.MAX,
                                false, getExecutionConfig()));
        }
 
@@ -202,7 +202,7 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         * @return The transformed DataStream.
         */
        @SuppressWarnings({ "rawtypes", "unchecked" })
-       public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean 
first) {
+       public SingleOutputStreamOperator<T, ?> minBy(String field, boolean 
first) {
                return aggregate(new ComparableAggregator(field, getType(), 
AggregationType.MINBY,
                                first, getExecutionConfig()));
        }
@@ -223,8 +223,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            be returned
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean 
first) {
-               return aggregate(new ComparableAggregator<OUT>(field, 
getType(), AggregationType.MAXBY,
+       public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean 
first) {
+               return aggregate(new ComparableAggregator<T>(field, getType(), 
AggregationType.MAXBY,
                                first, getExecutionConfig()));
        }
 
@@ -238,7 +238,7 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            The position in the data point to minimize
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+       public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
                return this.minBy(positionToMinBy, true);
        }
 
@@ -252,7 +252,7 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            The position in the data point to minimize
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) 
{
+       public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
                return this.minBy(positionToMinBy, true);
        }
 
@@ -270,8 +270,8 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            minimal value, otherwise returns the last
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, 
boolean first) {
-               return aggregate(new ComparableAggregator<OUT>(positionToMinBy, 
getType(), AggregationType.MINBY, first,
+       public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, 
boolean first) {
+               return aggregate(new ComparableAggregator<T>(positionToMinBy, 
getType(), AggregationType.MINBY, first,
                                getExecutionConfig()));
        }
 
@@ -285,7 +285,7 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            The position in the data point to maximize
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+       public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
                return this.maxBy(positionToMaxBy, true);
        }
 
@@ -299,7 +299,7 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            The position in the data point to maximize
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) 
{
+       public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
                return this.maxBy(positionToMaxBy, true);
        }
 
@@ -317,13 +317,13 @@ public class GroupedDataStream<OUT, KEY> extends 
KeyedDataStream<OUT, KEY> {
         *            maximum value, otherwise returns the last
         * @return The transformed DataStream.
         */
-       public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, 
boolean first) {
-               return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, 
getType(), AggregationType.MAXBY, first,
+       public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, 
boolean first) {
+               return aggregate(new ComparableAggregator<T>(positionToMaxBy, 
getType(), AggregationType.MAXBY, first,
                                getExecutionConfig()));
        }
 
-       protected SingleOutputStreamOperator<OUT, ?> 
aggregate(AggregationFunction<OUT> aggregate) {
-               StreamGroupedReduce<OUT> operator = new 
StreamGroupedReduce<OUT>(clean(aggregate), keySelector);
+       protected SingleOutputStreamOperator<T, ?> 
aggregate(AggregationFunction<T> aggregate) {
+               StreamGroupedReduce<T> operator = new 
StreamGroupedReduce<T>(clean(aggregate), keySelector);
                return transform("Grouped Aggregation", getType(), operator);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index a32cf53..611953e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -34,11 +34,11 @@ import 
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * 
  * 
  * @param <T> The type of the elements in the Keyed Stream.
- * @param <K> The type of the key in the Keyed Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
  */
-public class KeyedDataStream<T, K> extends DataStream<T> {
+public class KeyedDataStream<T, KEY> extends DataStream<T> {
        
-       protected final KeySelector<T, K> keySelector;
+       protected final KeySelector<T, KEY> keySelector;
 
        /**
         * Creates a new {@link KeyedDataStream} using the given {@link 
KeySelector}
@@ -49,13 +49,13 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
         * @param keySelector
         *            Function for determining state partitions
         */
-       public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> 
keySelector) {
+       public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> 
keySelector) {
                super(dataStream.getExecutionEnvironment(), new 
PartitionTransformation<T>(dataStream.getTransformation(), new 
HashPartitioner<T>(keySelector)));
                this.keySelector = keySelector;
        }
 
        
-       public KeySelector<T, K> getKeySelector() {
+       public KeySelector<T, KEY> getKeySelector() {
                return this.keySelector;
        }
 
@@ -98,8 +98,8 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
         * @param policy The policy that defines the window.
         * @return The windows data stream. 
         */
-       public KeyedWindowDataStream<T, K> window(WindowPolicy policy) {
-               return new KeyedWindowDataStream<T, K>(this, policy);
+       public KeyedWindowDataStream<T, KEY> window(WindowPolicy policy) {
+               return new KeyedWindowDataStream<T, KEY>(this, policy);
        }
 
        /**
@@ -112,7 +112,7 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
         * @param slide The additional policy defining the slide of the window. 
         * @return The windows data stream.
         */
-       public KeyedWindowDataStream<T, K> window(WindowPolicy window, 
WindowPolicy slide) {
-               return new KeyedWindowDataStream<T, K>(this, window, slide);
+       public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, 
WindowPolicy slide) {
+               return new KeyedWindowDataStream<T, KEY>(this, window, slide);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index 37151d7..711a959 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -43,13 +43,13 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
  * KeyedWindowDataStream will be collapsed together with the KeyedDataStream 
and the operation
  * over the window into one single operation.
  * 
- * @param <Type> The type of elements in the stream.
- * @param <Key> The type of the key by which elements are grouped.
+ * @param <T> The type of elements in the stream.
+ * @param <K> The type of the key by which elements are grouped.
  */
-public class KeyedWindowDataStream<Type, Key> {
+public class KeyedWindowDataStream<T, K> {
 
        /** The keyed data stream that is windowed by this stream */
-       private final KeyedDataStream<Type, Key> input;
+       private final KeyedDataStream<T, K> input;
 
        /** The core window policy */
        private final WindowPolicy windowPolicy;
@@ -58,11 +58,11 @@ public class KeyedWindowDataStream<Type, Key> {
        private final WindowPolicy slidePolicy;
        
        
-       public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, 
WindowPolicy windowPolicy) {
+       public KeyedWindowDataStream(KeyedDataStream<T, K> input, WindowPolicy 
windowPolicy) {
                this(input, windowPolicy, null);
        }
 
-       public KeyedWindowDataStream(KeyedDataStream<Type, Key> input,
+       public KeyedWindowDataStream(KeyedDataStream<T, K> input,
                                                                WindowPolicy 
windowPolicy, WindowPolicy slidePolicy) 
        {
                TimeCharacteristic time = 
input.getExecutionEnvironment().getStreamTimeCharacteristic();
@@ -91,7 +91,7 @@ public class KeyedWindowDataStream<Type, Key> {
         * @param function The reduce function.
         * @return The data stream that is the result of applying the reduce 
function to the window. 
         */
-       public DataStream<Type> reduceWindow(ReduceFunction<Type> function) {
+       public DataStream<T> reduceWindow(ReduceFunction<T> function) {
                String callLocation = Utils.getCallLocationName();
                return createWindowOperator(function, input.getType(), "Reduce 
at " + callLocation);
        }
@@ -107,10 +107,10 @@ public class KeyedWindowDataStream<Type, Key> {
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, 
Result, Key> function) {
+       public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, 
Result, K> function) {
                String callLocation = Utils.getCallLocationName();
 
-               TypeInformation<Type> inType = input.getType();
+               TypeInformation<T> inType = input.getType();
                TypeInformation<Result> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
                                function, KeyedWindowFunction.class, true, 
true, inType, null, false);
 
@@ -125,9 +125,9 @@ public class KeyedWindowDataStream<Type, Key> {
                        Function function, TypeInformation<Result> resultType, 
String functionName) {
 
                String opName = windowPolicy.toString(slidePolicy) + " of " + 
functionName;
-               KeySelector<Type, Key> keySel = input.getKeySelector();
+               KeySelector<T, K> keySel = input.getKeySelector();
                
-               OneInputStreamOperator<Type, Result> operator =
+               OneInputStreamOperator<T, Result> operator =
                                
PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, 
keySel);
                
                return input.transform(opName, resultType, operator);

http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 1226adf..ef6f53b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -80,46 +80,45 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
  * can be applied to the windows. The results of these transformations are also
  * WindowedDataStreams of the same discretisation unit.
  * 
- * @param <OUT>
- *            The output type of the {@link WindowedDataStream}
+ * @param <T> The output type of the {@link WindowedDataStream}
  */
-public class WindowedDataStream<OUT> {
+public class WindowedDataStream<T> {
 
-       protected DataStream<OUT> dataStream;
+       protected DataStream<T> dataStream;
 
        protected boolean isLocal = false;
 
-       protected KeySelector<OUT, ?> discretizerKey;
-       protected KeySelector<OUT, ?> groupByKey;
+       protected KeySelector<T, ?> discretizerKey;
+       protected KeySelector<T, ?> groupByKey;
 
-       protected WindowingHelper<OUT> triggerHelper;
-       protected WindowingHelper<OUT> evictionHelper;
+       protected WindowingHelper<T> triggerHelper;
+       protected WindowingHelper<T> evictionHelper;
 
-       protected TriggerPolicy<OUT> userTrigger;
-       protected EvictionPolicy<OUT> userEvicter;
+       protected TriggerPolicy<T> userTrigger;
+       protected EvictionPolicy<T> userEvicter;
 
-       protected WindowedDataStream(DataStream<OUT> dataStream, 
WindowingHelper<OUT> policyHelper) {
+       protected WindowedDataStream(DataStream<T> dataStream, 
WindowingHelper<T> policyHelper) {
                this.dataStream = dataStream;
                this.triggerHelper = policyHelper;
 
                if (dataStream instanceof GroupedDataStream) {
-                       this.discretizerKey = ((GroupedDataStream<OUT, ?>) 
dataStream).keySelector;
+                       this.discretizerKey = ((GroupedDataStream<T, ?>) 
dataStream).keySelector;
                }
        }
 
-       protected WindowedDataStream(DataStream<OUT> dataStream, 
TriggerPolicy<OUT> trigger,
-                       EvictionPolicy<OUT> evicter) {
+       protected WindowedDataStream(DataStream<T> dataStream, TriggerPolicy<T> 
trigger,
+                       EvictionPolicy<T> evicter) {
                this.dataStream = dataStream;
 
                this.userTrigger = trigger;
                this.userEvicter = evicter;
 
                if (dataStream instanceof GroupedDataStream) {
-                       this.discretizerKey = ((GroupedDataStream<OUT, ?>) 
dataStream).keySelector;
+                       this.discretizerKey = ((GroupedDataStream<T, ?>) 
dataStream).keySelector;
                }
        }
 
-       protected WindowedDataStream(WindowedDataStream<OUT> 
windowedDataStream) {
+       protected WindowedDataStream(WindowedDataStream<T> windowedDataStream) {
                this.dataStream = windowedDataStream.dataStream;
                this.discretizerKey = windowedDataStream.discretizerKey;
                this.groupByKey = windowedDataStream.groupByKey;
@@ -148,9 +147,9 @@ public class WindowedDataStream<OUT> {
         * @return The windowed data stream with triggering set
         */
        @SuppressWarnings({ "unchecked", "rawtypes" })
-       public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
+       public WindowedDataStream<T> every(WindowingHelper policyHelper) {
                policyHelper.setExecutionConfig(getExecutionConfig());
-               WindowedDataStream<OUT> ret = this.copy();
+               WindowedDataStream<T> ret = this.copy();
                if (ret.evictionHelper == null) {
                        ret.evictionHelper = ret.triggerHelper;
                        ret.triggerHelper = policyHelper;
@@ -171,11 +170,11 @@ public class WindowedDataStream<OUT> {
         *            The position of the fields to group by.
         * @return The grouped {@link WindowedDataStream}
         */
-       public WindowedDataStream<OUT> groupBy(int... fields) {
+       public WindowedDataStream<T> groupBy(int... fields) {
                if (getType() instanceof BasicArrayTypeInfo || getType() 
instanceof PrimitiveArrayTypeInfo) {
-                       return groupBy(new 
KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+                       return groupBy(new 
KeySelectorUtil.ArrayKeySelector<T>(fields));
                } else {
-                       return groupBy(new Keys.ExpressionKeys<OUT>(fields, 
getType()));
+                       return groupBy(new Keys.ExpressionKeys<T>(fields, 
getType()));
                }
        }
 
@@ -195,8 +194,8 @@ public class WindowedDataStream<OUT> {
         *            The fields to group by
         * @return The grouped {@link WindowedDataStream}
         */
-       public WindowedDataStream<OUT> groupBy(String... fields) {
-               return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+       public WindowedDataStream<T> groupBy(String... fields) {
+               return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
        }
 
        /**
@@ -211,13 +210,13 @@ public class WindowedDataStream<OUT> {
         *            The keySelector used to extract the key for grouping.
         * @return The grouped {@link WindowedDataStream}
         */
-       public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) 
{
-               WindowedDataStream<OUT> ret = this.copy();
+       public WindowedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
+               WindowedDataStream<T> ret = this.copy();
                ret.groupByKey = keySelector;
                return ret;
        }
 
-       private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) {
+       private WindowedDataStream<T> groupBy(Keys<T> keys) {
                return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, 
getType(),
                                getExecutionConfig())));
        }
@@ -228,8 +227,8 @@ public class WindowedDataStream<OUT> {
         * 
         * @return The WindowedDataStream with local discretisation
         */
-       public WindowedDataStream<OUT> local() {
-               WindowedDataStream<OUT> out = copy();
+       public WindowedDataStream<T> local() {
+               WindowedDataStream<T> out = copy();
                out.isLocal = true;
                return out;
        }
@@ -241,11 +240,11 @@ public class WindowedDataStream<OUT> {
         * 
         * @return The discretised stream
         */
-       public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
+       public DataStream<StreamWindow<T>> getDiscretizedStream() {
                if (getEviction() instanceof KeepAllEvictionPolicy) {
                        throw new RuntimeException("Cannot get discretized 
stream for full stream window");
                }
-               return discretize(WindowTransformation.NONE, new 
BasicWindowBuffer<OUT>())
+               return discretize(WindowTransformation.NONE, new 
BasicWindowBuffer<T>())
                                .getDiscretizedStream();
        }
 
@@ -255,7 +254,7 @@ public class WindowedDataStream<OUT> {
         * 
         * @return The data stream consisting of the individual records.
         */
-       public DataStream<OUT> flatten() {
+       public DataStream<T> flatten() {
                return dataStream;
        }
 
@@ -269,7 +268,7 @@ public class WindowedDataStream<OUT> {
         *            The reduce function that will be applied to the windows.
         * @return The transformed DataStream
         */
-       public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> 
reduceFunction) {
+       public DiscretizedStream<T> reduceWindow(ReduceFunction<T> 
reduceFunction) {
 
                // We check whether we should apply parallel time 
discretization, which
                // is a more complex exploiting the monotonic properties of time
@@ -281,9 +280,9 @@ public class WindowedDataStream<OUT> {
                        WindowTransformation transformation = 
WindowTransformation.REDUCEWINDOW
                                        .with(clean(reduceFunction));
 
-                       WindowBuffer<OUT> windowBuffer = 
getWindowBuffer(transformation);
+                       WindowBuffer<T> windowBuffer = 
getWindowBuffer(transformation);
 
-                       DiscretizedStream<OUT> discretized = 
discretize(transformation, windowBuffer);
+                       DiscretizedStream<T> discretized = 
discretize(transformation, windowBuffer);
 
                        if (windowBuffer instanceof PreAggregator) {
                                return discretized;
@@ -310,11 +309,11 @@ public class WindowedDataStream<OUT> {
         *            The output type of the operator
         * @return The transformed DataStream
         */
-       public <R> DiscretizedStream<R> foldWindow(R initialValue, 
FoldFunction<OUT, R> foldFunction,
+       public <R> DiscretizedStream<R> foldWindow(R initialValue, 
FoldFunction<T, R> foldFunction,
                        TypeInformation<R> outType) {
 
                return 
discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)),
-                               new 
BasicWindowBuffer<OUT>()).foldWindow(initialValue, foldFunction, outType);
+                               new 
BasicWindowBuffer<T>()).foldWindow(initialValue, foldFunction, outType);
 
        }
 
@@ -330,7 +329,7 @@ public class WindowedDataStream<OUT> {
         *            Initial value given to foldFunction
         * @return The transformed DataStream
         */
-       public <R> DiscretizedStream<R> foldWindow(R initialValue, 
FoldFunction<OUT, R> foldFunction) {
+       public <R> DiscretizedStream<R> foldWindow(R initialValue, 
FoldFunction<T, R> foldFunction) {
 
                TypeInformation<R> outType = 
TypeExtractor.getFoldReturnTypes(clean(foldFunction),
                                getType());
@@ -350,7 +349,7 @@ public class WindowedDataStream<OUT> {
         *            The function that will be applied to the windows.
         * @return The transformed DataStream
         */
-       public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> 
windowMapFunction) {
+       public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> 
windowMapFunction) {
                return 
discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
                                
getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction);
        }
@@ -373,7 +372,7 @@ public class WindowedDataStream<OUT> {
         *            The output type of the operator.
         * @return The transformed DataStream
         */
-       public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> 
windowMapFunction,
+       public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> 
windowMapFunction,
                        TypeInformation<R> outType) {
 
                return 
discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
@@ -381,24 +380,24 @@ public class WindowedDataStream<OUT> {
                                outType);
        }
 
-       private DiscretizedStream<OUT> discretize(WindowTransformation 
transformation,
-                       WindowBuffer<OUT> windowBuffer) {
+       private DiscretizedStream<T> discretize(WindowTransformation 
transformation,
+                       WindowBuffer<T> windowBuffer) {
 
-               OneInputStreamOperator<OUT, WindowEvent<OUT>> discretizer = 
getDiscretizer();
+               OneInputStreamOperator<T, WindowEvent<T>> discretizer = 
getDiscretizer();
 
-               OneInputStreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> 
bufferOperator = getBufferOperator(windowBuffer);
+               OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> 
bufferOperator = getBufferOperator(windowBuffer);
 
                @SuppressWarnings({ "unchecked", "rawtypes" })
-               TypeInformation<WindowEvent<OUT>> bufferEventType = new 
TupleTypeInfo(WindowEvent.class,
+               TypeInformation<WindowEvent<T>> bufferEventType = new 
TupleTypeInfo(WindowEvent.class,
                                getType(), BasicTypeInfo.INT_TYPE_INFO);
 
                int parallelism = getDiscretizerParallelism(transformation);
 
-               return new DiscretizedStream<OUT>(dataStream
+               return new DiscretizedStream<T>(dataStream
                                
.transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer)
                                .setParallelism(parallelism)
                                
.transform(windowBuffer.getClass().getSimpleName(),
-                                               new 
StreamWindowTypeInfo<OUT>(getType()), bufferOperator)
+                                               new 
StreamWindowTypeInfo<T>(getType()), bufferOperator)
                                .setParallelism(parallelism), groupByKey, 
transformation, false);
 
        }
@@ -430,7 +429,7 @@ public class WindowedDataStream<OUT> {
         *            Reduce function to apply
         * @return The transformed stream
         */
-       protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> 
reduceFunction) {
+       protected DiscretizedStream<T> timeReduce(ReduceFunction<T> 
reduceFunction) {
 
                WindowTransformation transformation = 
WindowTransformation.REDUCEWINDOW
                                .with(clean(reduceFunction));
@@ -438,7 +437,7 @@ public class WindowedDataStream<OUT> {
                // We get the windowbuffer and set it to emit empty windows with
                // sequential IDs. This logic is necessary to merge windows 
created in
                // parallel.
-               WindowBuffer<OUT> windowBuffer = 
getWindowBuffer(transformation).emitEmpty().sequentialID();
+               WindowBuffer<T> windowBuffer = 
getWindowBuffer(transformation).emitEmpty().sequentialID();
 
                // If there is a groupby for the reduce operation we apply it 
before the
                // discretizers, because we will forward everything afterwards 
to
@@ -449,7 +448,7 @@ public class WindowedDataStream<OUT> {
 
                // We discretize the stream and call the timeReduce function of 
the
                // discretized stream, we also pass the type of the windowbuffer
-               DiscretizedStream<OUT> discretized = discretize(transformation, 
windowBuffer);
+               DiscretizedStream<T> discretized = discretize(transformation, 
windowBuffer);
 
                if (getEviction() instanceof KeepAllEvictionPolicy
                                && !(windowBuffer instanceof PreAggregator)) {
@@ -464,27 +463,27 @@ public class WindowedDataStream<OUT> {
        /**
         * Based on the defined policies, returns the stream discretizer to be 
used
         */
-       private OneInputStreamOperator<OUT, WindowEvent<OUT>> getDiscretizer() {
+       private OneInputStreamOperator<T, WindowEvent<T>> getDiscretizer() {
                if (discretizerKey == null) {
-                       return new StreamDiscretizer<OUT>(getTrigger(), 
getEviction());
+                       return new StreamDiscretizer<T>(getTrigger(), 
getEviction());
                } else if (getTrigger() instanceof CentralActiveTrigger) {
-                       return new GroupedActiveDiscretizer<OUT>(discretizerKey,
-                                       (CentralActiveTrigger<OUT>) 
getTrigger(),
-                                       (CloneableEvictionPolicy<OUT>) 
getEviction());
+                       return new GroupedActiveDiscretizer<T>(discretizerKey,
+                                       (CentralActiveTrigger<T>) getTrigger(),
+                                       (CloneableEvictionPolicy<T>) 
getEviction());
                } else {
-                       return new GroupedStreamDiscretizer<OUT>(discretizerKey,
-                                       (CloneableTriggerPolicy<OUT>) 
getTrigger(),
-                                       (CloneableEvictionPolicy<OUT>) 
getEviction());
+                       return new GroupedStreamDiscretizer<T>(discretizerKey,
+                                       (CloneableTriggerPolicy<T>) 
getTrigger(),
+                                       (CloneableEvictionPolicy<T>) 
getEviction());
                }
 
        }
 
-       private OneInputStreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> 
getBufferOperator(
-                       WindowBuffer<OUT> windowBuffer) {
+       private OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> 
getBufferOperator(
+                       WindowBuffer<T> windowBuffer) {
                if (discretizerKey == null) {
-                       return new StreamWindowBuffer<OUT>(windowBuffer);
+                       return new StreamWindowBuffer<T>(windowBuffer);
                } else {
-                       return new GroupedWindowBuffer<OUT>(windowBuffer, 
discretizerKey);
+                       return new GroupedWindowBuffer<T>(windowBuffer, 
discretizerKey);
                }
        }
 
@@ -496,43 +495,43 @@ public class WindowedDataStream<OUT> {
         * 
         */
        @SuppressWarnings("unchecked")
-       private WindowBuffer<OUT> getWindowBuffer(WindowTransformation 
transformation) {
-               TriggerPolicy<OUT> trigger = getTrigger();
-               EvictionPolicy<OUT> eviction = getEviction();
+       private WindowBuffer<T> getWindowBuffer(WindowTransformation 
transformation) {
+               TriggerPolicy<T> trigger = getTrigger();
+               EvictionPolicy<T> eviction = getEviction();
 
                if (transformation == WindowTransformation.REDUCEWINDOW) {
                        if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
                                if (eviction instanceof KeepAllEvictionPolicy) {
                                        if (groupByKey == null) {
-                                               return new 
TumblingPreReducer<OUT>(
-                                                               
(ReduceFunction<OUT>) transformation.getUDF(), getType()
+                                               return new 
TumblingPreReducer<T>(
+                                                               
(ReduceFunction<T>) transformation.getUDF(), getType()
                                                                                
.createSerializer(getExecutionConfig())).noEvict();
                                        } else {
-                                               return new 
TumblingGroupedPreReducer<OUT>(
-                                                               
(ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
+                                               return new 
TumblingGroupedPreReducer<T>(
+                                                               
(ReduceFunction<T>) transformation.getUDF(), groupByKey,
                                                                
getType().createSerializer(getExecutionConfig())).noEvict();
                                        }
                                } else {
                                        if (groupByKey == null) {
-                                               return new 
TumblingPreReducer<OUT>(
-                                                               
(ReduceFunction<OUT>) transformation.getUDF(), getType()
+                                               return new 
TumblingPreReducer<T>(
+                                                               
(ReduceFunction<T>) transformation.getUDF(), getType()
                                                                                
.createSerializer(getExecutionConfig()));
                                        } else {
-                                               return new 
TumblingGroupedPreReducer<OUT>(
-                                                               
(ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
+                                               return new 
TumblingGroupedPreReducer<T>(
+                                                               
(ReduceFunction<T>) transformation.getUDF(), groupByKey,
                                                                
getType().createSerializer(getExecutionConfig()));
                                        }
                                }
                        } else if (WindowUtils.isSlidingCountPolicy(trigger, 
eviction)) {
                                if (groupByKey == null) {
-                                       return new SlidingCountPreReducer<OUT>(
-                                                       
clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+                                       return new SlidingCountPreReducer<T>(
+                                                       
clean((ReduceFunction<T>) transformation.getUDF()), dataStream
                                                                        
.getType().createSerializer(getExecutionConfig()),
                                                        
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
                                                        
((CountTriggerPolicy<?>) trigger).getStart());
                                } else {
-                                       return new 
SlidingCountGroupedPreReducer<OUT>(
-                                                       
clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+                                       return new 
SlidingCountGroupedPreReducer<T>(
+                                                       
clean((ReduceFunction<T>) transformation.getUDF()), dataStream
                                                                        
.getType().createSerializer(getExecutionConfig()), groupByKey,
                                                        
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
                                                        
((CountTriggerPolicy<?>) trigger).getStart());
@@ -540,14 +539,14 @@ public class WindowedDataStream<OUT> {
 
                        } else if (WindowUtils.isSlidingTimePolicy(trigger, 
eviction)) {
                                if (groupByKey == null) {
-                                       return new SlidingTimePreReducer<OUT>(
-                                                       (ReduceFunction<OUT>) 
transformation.getUDF(), dataStream.getType()
+                                       return new SlidingTimePreReducer<T>(
+                                                       (ReduceFunction<T>) 
transformation.getUDF(), dataStream.getType()
                                                                        
.createSerializer(getExecutionConfig()),
                                                        
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
                                                        
WindowUtils.getTimeStampWrapper(trigger));
                                } else {
-                                       return new 
SlidingTimeGroupedPreReducer<OUT>(
-                                                       (ReduceFunction<OUT>) 
transformation.getUDF(), dataStream.getType()
+                                       return new 
SlidingTimeGroupedPreReducer<T>(
+                                                       (ReduceFunction<T>) 
transformation.getUDF(), dataStream.getType()
                                                                        
.createSerializer(getExecutionConfig()), groupByKey,
                                                        
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
                                                        
WindowUtils.getTimeStampWrapper(trigger));
@@ -555,26 +554,26 @@ public class WindowedDataStream<OUT> {
 
                        } else if (WindowUtils.isJumpingCountPolicy(trigger, 
eviction)) {
                                if (groupByKey == null) {
-                                       return new JumpingCountPreReducer<OUT>(
-                                                       (ReduceFunction<OUT>) 
transformation.getUDF(), getType()
+                                       return new JumpingCountPreReducer<T>(
+                                                       (ReduceFunction<T>) 
transformation.getUDF(), getType()
                                                                        
.createSerializer(getExecutionConfig()),
                                                        
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
                                } else {
-                                       return new 
JumpingCountGroupedPreReducer<OUT>(
-                                                       (ReduceFunction<OUT>) 
transformation.getUDF(), groupByKey, getType()
+                                       return new 
JumpingCountGroupedPreReducer<T>(
+                                                       (ReduceFunction<T>) 
transformation.getUDF(), groupByKey, getType()
                                                                        
.createSerializer(getExecutionConfig()),
                                                        
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
                                }
                        } else if (WindowUtils.isJumpingTimePolicy(trigger, 
eviction)) {
                                if (groupByKey == null) {
-                                       return new JumpingTimePreReducer<OUT>(
-                                                       (ReduceFunction<OUT>) 
transformation.getUDF(), getType()
+                                       return new JumpingTimePreReducer<T>(
+                                                       (ReduceFunction<T>) 
transformation.getUDF(), getType()
                                                                        
.createSerializer(getExecutionConfig()),
                                                        
WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
                                                        
WindowUtils.getTimeStampWrapper(trigger));
                                } else {
-                                       return new 
JumpingTimeGroupedPreReducer<OUT>(
-                                                       (ReduceFunction<OUT>) 
transformation.getUDF(), groupByKey, getType()
+                                       return new 
JumpingTimeGroupedPreReducer<T>(
+                                                       (ReduceFunction<T>) 
transformation.getUDF(), groupByKey, getType()
                                                                        
.createSerializer(getExecutionConfig()),
                                                        
WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
                                                        
WindowUtils.getTimeStampWrapper(trigger));
@@ -586,7 +585,7 @@ public class WindowedDataStream<OUT> {
                        throw new RuntimeException(
                                        "Full stream policy can only be used 
with operations that support preaggregations, such as reduce or aggregations");
                } else {
-                       return new BasicWindowBuffer<OUT>();
+                       return new BasicWindowBuffer<T>();
                }
        }
 
@@ -598,8 +597,8 @@ public class WindowedDataStream<OUT> {
         *            The position in the tuple/array to sum
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> sum(int positionToSum) {
-               return aggregate(new SumAggregator<OUT>(positionToSum, 
getType(), getExecutionConfig()));
+       public WindowedDataStream<T> sum(int positionToSum) {
+               return aggregate(new SumAggregator<T>(positionToSum, getType(), 
getExecutionConfig()));
        }
 
        /**
@@ -613,8 +612,8 @@ public class WindowedDataStream<OUT> {
         *            The field to sum
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> sum(String field) {
-               return aggregate(new SumAggregator<OUT>(field, getType(), 
getExecutionConfig()));
+       public WindowedDataStream<T> sum(String field) {
+               return aggregate(new SumAggregator<T>(field, getType(), 
getExecutionConfig()));
        }
 
        /**
@@ -625,8 +624,8 @@ public class WindowedDataStream<OUT> {
         *            The position to minimize
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> min(int positionToMin) {
-               return aggregate(new ComparableAggregator<OUT>(positionToMin, 
getType(), AggregationType.MIN,
+       public WindowedDataStream<T> min(int positionToMin) {
+               return aggregate(new ComparableAggregator<T>(positionToMin, 
getType(), AggregationType.MIN,
                                getExecutionConfig()));
        }
 
@@ -642,8 +641,8 @@ public class WindowedDataStream<OUT> {
         *            applied.
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> min(String field) {
-               return aggregate(new ComparableAggregator<OUT>(field, 
getType(), AggregationType.MIN,
+       public WindowedDataStream<T> min(String field) {
+               return aggregate(new ComparableAggregator<T>(field, getType(), 
AggregationType.MIN,
                                false, getExecutionConfig()));
        }
 
@@ -656,7 +655,7 @@ public class WindowedDataStream<OUT> {
         *            The position to minimize by
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> minBy(int positionToMinBy) {
+       public WindowedDataStream<T> minBy(int positionToMinBy) {
                return this.minBy(positionToMinBy, true);
        }
 
@@ -669,7 +668,7 @@ public class WindowedDataStream<OUT> {
         *            The position to minimize by
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> minBy(String positionToMinBy) {
+       public WindowedDataStream<T> minBy(String positionToMinBy) {
                return this.minBy(positionToMinBy, true);
        }
 
@@ -686,8 +685,8 @@ public class WindowedDataStream<OUT> {
         *            minimum value, otherwise returns the last
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> minBy(int positionToMinBy, boolean 
first) {
-               return aggregate(new ComparableAggregator<OUT>(positionToMinBy, 
getType(), AggregationType.MINBY, first,
+       public WindowedDataStream<T> minBy(int positionToMinBy, boolean first) {
+               return aggregate(new ComparableAggregator<T>(positionToMinBy, 
getType(), AggregationType.MINBY, first,
                                getExecutionConfig()));
        }
 
@@ -706,8 +705,8 @@ public class WindowedDataStream<OUT> {
         *            be returned
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> minBy(String field, boolean first) {
-               return aggregate(new ComparableAggregator<OUT>(field, 
getType(), AggregationType.MINBY,
+       public WindowedDataStream<T> minBy(String field, boolean first) {
+               return aggregate(new ComparableAggregator<T>(field, getType(), 
AggregationType.MINBY,
                                first, getExecutionConfig()));
        }
 
@@ -719,8 +718,8 @@ public class WindowedDataStream<OUT> {
         *            The position to maximize
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> max(int positionToMax) {
-               return aggregate(new ComparableAggregator<OUT>(positionToMax, 
getType(), AggregationType.MAX,
+       public WindowedDataStream<T> max(int positionToMax) {
+               return aggregate(new ComparableAggregator<T>(positionToMax, 
getType(), AggregationType.MAX,
                                getExecutionConfig()));
        }
 
@@ -736,8 +735,8 @@ public class WindowedDataStream<OUT> {
         *            applied.
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> max(String field) {
-               return aggregate(new ComparableAggregator<OUT>(field, 
getType(), AggregationType.MAX,
+       public WindowedDataStream<T> max(String field) {
+               return aggregate(new ComparableAggregator<T>(field, getType(), 
AggregationType.MAX,
                                false, getExecutionConfig()));
        }
 
@@ -750,7 +749,7 @@ public class WindowedDataStream<OUT> {
         *            The position to maximize by
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> maxBy(int positionToMaxBy) {
+       public WindowedDataStream<T> maxBy(int positionToMaxBy) {
                return this.maxBy(positionToMaxBy, true);
        }
 
@@ -763,7 +762,7 @@ public class WindowedDataStream<OUT> {
         *            The position to maximize by
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> maxBy(String positionToMaxBy) {
+       public WindowedDataStream<T> maxBy(String positionToMaxBy) {
                return this.maxBy(positionToMaxBy, true);
        }
 
@@ -780,8 +779,8 @@ public class WindowedDataStream<OUT> {
         *            maximum value, otherwise returns the last
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> maxBy(int positionToMaxBy, boolean 
first) {
-               return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, 
getType(), AggregationType.MAXBY, first,
+       public WindowedDataStream<T> maxBy(int positionToMaxBy, boolean first) {
+               return aggregate(new ComparableAggregator<T>(positionToMaxBy, 
getType(), AggregationType.MAXBY, first,
                                getExecutionConfig()));
        }
 
@@ -800,16 +799,16 @@ public class WindowedDataStream<OUT> {
         *            be returned
         * @return The transformed DataStream.
         */
-       public WindowedDataStream<OUT> maxBy(String field, boolean first) {
-               return aggregate(new ComparableAggregator<OUT>(field, 
getType(), AggregationType.MAXBY, first,
+       public WindowedDataStream<T> maxBy(String field, boolean first) {
+               return aggregate(new ComparableAggregator<T>(field, getType(), 
AggregationType.MAXBY, first,
                                getExecutionConfig()));
        }
 
-       private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> 
aggregator) {
+       private WindowedDataStream<T> aggregate(AggregationFunction<T> 
aggregator) {
                return reduceWindow(aggregator);
        }
 
-       protected TriggerPolicy<OUT> getTrigger() {
+       protected TriggerPolicy<T> getTrigger() {
 
                if (triggerHelper != null) {
                        return triggerHelper.toTrigger();
@@ -821,7 +820,7 @@ public class WindowedDataStream<OUT> {
 
        }
 
-       protected EvictionPolicy<OUT> getEviction() {
+       protected EvictionPolicy<T> getEviction() {
 
                if (evictionHelper != null) {
                        return evictionHelper.toEvict();
@@ -829,7 +828,7 @@ public class WindowedDataStream<OUT> {
                        if (triggerHelper instanceof Time) {
                                return triggerHelper.toEvict();
                        } else {
-                               return new TumblingEvictionPolicy<OUT>();
+                               return new TumblingEvictionPolicy<T>();
                        }
                } else {
                        return userEvicter;
@@ -854,7 +853,7 @@ public class WindowedDataStream<OUT> {
         * 
         * @return The output type.
         */
-       public TypeInformation<OUT> getType() {
+       public TypeInformation<T> getType() {
                return dataStream.getType();
        }
 
@@ -862,7 +861,7 @@ public class WindowedDataStream<OUT> {
                return dataStream.getExecutionConfig();
        }
 
-       protected WindowedDataStream<OUT> copy() {
-               return new WindowedDataStream<OUT>(this);
+       protected WindowedDataStream<T> copy() {
+               return new WindowedDataStream<T>(this);
        }
 }

Reply via email to