[FLINK-2725] Add Max/Min/Sum aggregation for mutable types.

This closes #1191


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da248b15
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da248b15
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da248b15

Branch: refs/heads/master
Commit: da248b15e1b1dbe09345d3bb186dc815a45e9a3c
Parents: 6491559
Author: Greg Hogan <c...@greghogan.com>
Authored: Tue Sep 22 13:01:47 2015 -0400
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Mon Oct 19 15:39:28 2015 +0200

----------------------------------------------------------------------
 .../aggregation/MaxAggregationFunction.java     |  83 ++-
 .../aggregation/MinAggregationFunction.java     |  85 ++-
 .../aggregation/SumAggregationFunction.java     | 190 ++++-
 .../flink/api/java/typeutils/ValueTypeInfo.java |  13 +-
 .../test/javaApiOperators/AggregateITCase.java  |  71 ++
 .../util/ValueCollectionDataSets.java           | 730 +++++++++++++++++++
 6 files changed, 1110 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
index f25ca87..59d9e13 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
@@ -18,35 +18,74 @@
 
 package org.apache.flink.api.java.aggregation;
 
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.ResettableValue;
 
-public class MaxAggregationFunction<T extends Comparable<T>> extends 
AggregationFunction<T> {
-       private static final long serialVersionUID = 1L;
 
-       private T value;
+public abstract class MaxAggregationFunction<T extends Comparable<T>> extends 
AggregationFunction<T> {
+       private static final long serialVersionUID = 1L;
 
        @Override
-       public void initializeAggregate() {
-               value = null;
+       public String toString() {
+               return "MAX";
        }
 
-       @Override
-       public void aggregate(T val) {
-               if (value != null) {
-                       int cmp = value.compareTo(val);
-                       value = (cmp > 0) ? value : val;
-               } else {
-                       value = val;
+       // 
--------------------------------------------------------------------------------------------
+
+       public static final class ImmutableMaxAgg<U extends Comparable<U>> 
extends MaxAggregationFunction<U> {
+               private static final long serialVersionUID = 1L;
+
+               private U value;
+
+               @Override
+               public void initializeAggregate() {
+                       value = null;
                }
-       }
 
-       @Override
-       public T getAggregate() {
-               return value;
+               @Override
+               public void aggregate(U val) {
+                       if (value != null) {
+                               int cmp = value.compareTo(val);
+                               value = (cmp > 0) ? value : val;
+                       } else {
+                               value = val;
+                       }
+               }
+
+               @Override
+               public U getAggregate() {
+                       return value;
+               }
        }
        
-       @Override
-       public String toString() {
-               return "MAX";
+       // 
--------------------------------------------------------------------------------------------
+
+       public static final class MutableMaxAgg<U extends Comparable<U> & 
ResettableValue<U> & CopyableValue<U>> extends MaxAggregationFunction<U> {
+               private static final long serialVersionUID = 1L;
+
+               private U value;
+
+               @Override
+               public void initializeAggregate() {
+                       value = null;
+               }
+
+               @Override
+               public void aggregate(U val) {
+                       if (value != null) {
+                               int cmp = value.compareTo(val);
+                               if (cmp < 0) {
+                                       value.setValue(val);
+                               }
+                       } else {
+                               value = val.copy();
+                       }
+               }
+
+               @Override
+               public U getAggregate() {
+                       return value;
+               }
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -58,7 +97,11 @@ public class MaxAggregationFunction<T extends Comparable<T>> 
extends Aggregation
                @Override
                public <T> AggregationFunction<T> 
createAggregationFunction(Class<T> type) {
                        if (Comparable.class.isAssignableFrom(type)) {
-                               return (AggregationFunction<T>) new 
MaxAggregationFunction();
+                               if 
(ResettableValue.class.isAssignableFrom(type) & 
CopyableValue.class.isAssignableFrom(type)) {
+                                       return (AggregationFunction<T>) new 
MutableMaxAgg();
+                               } else {
+                                       return (AggregationFunction<T>) new 
ImmutableMaxAgg();
+                               }
                        } else {
                                throw new 
UnsupportedAggregationTypeException("The type " + type.getName() + 
                                        " is not supported for maximum 
aggregation. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
index faf28a7..b72b0f4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
@@ -18,35 +18,74 @@
 
 package org.apache.flink.api.java.aggregation;
 
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.ResettableValue;
 
-public class MinAggregationFunction<T extends Comparable<T>> extends 
AggregationFunction<T> {
-       private static final long serialVersionUID = 1L;
 
-       private T value;
+public abstract class MinAggregationFunction<T extends Comparable<T>> extends 
AggregationFunction<T> {
+       private static final long serialVersionUID = 1L;
 
        @Override
-       public void initializeAggregate() {
-               value = null;
+       public String toString() {
+               return "MIN";
        }
 
-       @Override
-       public void aggregate(T val) {
-               if (value != null) {
-                       int cmp = value.compareTo(val);
-                       value = (cmp < 0) ? value : val;
-               } else {
-                       value = val;
+       // 
--------------------------------------------------------------------------------------------
+
+       public static final class ImmutableMinAgg<U extends Comparable<U>> 
extends MinAggregationFunction<U> {
+               private static final long serialVersionUID = 1L;
+
+               private U value;
+
+               @Override
+               public void initializeAggregate() {
+                       value = null;
                }
-       }
 
-       @Override
-       public T getAggregate() {
-               return value;
+               @Override
+               public void aggregate(U val) {
+                       if (value != null) {
+                               int cmp = value.compareTo(val);
+                               value = (cmp < 0) ? value : val;
+                       } else {
+                               value = val;
+                       }
+               }
+
+               @Override
+               public U getAggregate() {
+                       return value;
+               }
        }
-       
-       @Override
-       public String toString() {
-               return "MIN";
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public static final class MutableMinAgg<U extends Comparable<U> & 
ResettableValue<U> & CopyableValue<U>> extends MinAggregationFunction<U> {
+               private static final long serialVersionUID = 1L;
+
+               private U value;
+
+               @Override
+               public void initializeAggregate() {
+                       value = null;
+               }
+
+               @Override
+               public void aggregate(U val) {
+                       if (value != null) {
+                               int cmp = value.compareTo(val);
+                               if (cmp > 0) {
+                                       value.setValue(val);
+                               }
+                       } else {
+                               value = val.copy();
+                       }
+               }
+
+               @Override
+               public U getAggregate() {
+                       return value;
+               }
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -58,7 +97,11 @@ public class MinAggregationFunction<T extends Comparable<T>> 
extends Aggregation
                @Override
                public <T> AggregationFunction<T> 
createAggregationFunction(Class<T> type) {
                        if (Comparable.class.isAssignableFrom(type)) {
-                               return (AggregationFunction<T>) new 
MinAggregationFunction();
+                               if 
(ResettableValue.class.isAssignableFrom(type) & 
CopyableValue.class.isAssignableFrom(type)) {
+                                       return (AggregationFunction<T>) new 
MutableMinAgg();
+                               } else {
+                                       return (AggregationFunction<T>) new 
ImmutableMinAgg();
+                               }
                        } else {
                                throw new 
UnsupportedAggregationTypeException("The type " + type.getName() + 
                                        " is not supported for minimum 
aggregation. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
index 24e8f31..ad4644b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
@@ -18,21 +18,27 @@
 
 package org.apache.flink.api.java.aggregation;
 
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.ShortValue;
 
 public abstract class SumAggregationFunction<T> extends AggregationFunction<T> 
{
-       
+
        private static final long serialVersionUID = 1L;
 
        @Override
        public String toString() {
                return "SUM";
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        public static final class ByteSumAgg extends 
SumAggregationFunction<Byte> {
                private static final long serialVersionUID = 1L;
-               
+
                private long agg;
 
                @Override
@@ -50,10 +56,31 @@ public abstract class SumAggregationFunction<T> extends 
AggregationFunction<T> {
                        return (byte) agg;
                }
        }
-       
+
+       public static final class ByteValueSumAgg extends 
SumAggregationFunction<ByteValue> {
+               private static final long serialVersionUID = 1L;
+
+               private long agg;
+
+               @Override
+               public void initializeAggregate() {
+                       agg = 0;
+               }
+
+               @Override
+               public void aggregate(ByteValue value) {
+                       agg += value.getValue();
+               }
+
+               @Override
+               public ByteValue getAggregate() {
+                       return new ByteValue((byte) agg);
+               }
+       }
+
        public static final class ShortSumAgg extends 
SumAggregationFunction<Short> {
                private static final long serialVersionUID = 1L;
-               
+
                private long agg;
 
                @Override
@@ -71,10 +98,31 @@ public abstract class SumAggregationFunction<T> extends 
AggregationFunction<T> {
                        return (short) agg;
                }
        }
-       
+
+       public static final class ShortValueSumAgg extends 
SumAggregationFunction<ShortValue> {
+               private static final long serialVersionUID = 1L;
+
+               private long agg;
+
+               @Override
+               public void initializeAggregate() {
+                       agg = 0;
+               }
+
+               @Override
+               public void aggregate(ShortValue value) {
+                       agg += value.getValue();
+               }
+
+               @Override
+               public ShortValue getAggregate() {
+                       return new ShortValue((short) agg);
+               }
+       }
+
        public static final class IntSumAgg extends 
SumAggregationFunction<Integer> {
                private static final long serialVersionUID = 1L;
-               
+
                private long agg;
 
                @Override
@@ -92,10 +140,31 @@ public abstract class SumAggregationFunction<T> extends 
AggregationFunction<T> {
                        return (int) agg;
                }
        }
-       
+
+       public static final class IntValueSumAgg extends 
SumAggregationFunction<IntValue> {
+               private static final long serialVersionUID = 1L;
+
+               private long agg;
+
+               @Override
+               public void initializeAggregate() {
+                       agg = 0;
+               }
+
+               @Override
+               public void aggregate(IntValue value) {
+                       agg += value.getValue();
+               }
+
+               @Override
+               public IntValue getAggregate() {
+                       return new IntValue((int) agg);
+               }
+       }
+
        public static final class LongSumAgg extends 
SumAggregationFunction<Long> {
                private static final long serialVersionUID = 1L;
-               
+
                private long agg;
 
                @Override
@@ -113,11 +182,32 @@ public abstract class SumAggregationFunction<T> extends 
AggregationFunction<T> {
                        return agg;
                }
        }
-       
+
+       public static final class LongValueSumAgg extends 
SumAggregationFunction<LongValue> {
+               private static final long serialVersionUID = 1L;
+
+               private long agg;
+
+               @Override
+               public void initializeAggregate() {
+                       agg = 0L;
+               }
+
+               @Override
+               public void aggregate(LongValue value) {
+                       agg += value.getValue();
+               }
+
+               @Override
+               public LongValue getAggregate() {
+                       return new LongValue(agg);
+               }
+       }
+
        public static final class FloatSumAgg extends 
SumAggregationFunction<Float> {
                private static final long serialVersionUID = 1L;
-               
-               private float agg;
+
+               private double agg;
 
                @Override
                public void initializeAggregate() {
@@ -131,13 +221,34 @@ public abstract class SumAggregationFunction<T> extends 
AggregationFunction<T> {
 
                @Override
                public Float getAggregate() {
-                       return agg;
+                       return (float) agg;
+               }
+       }
+
+       public static final class FloatValueSumAgg extends 
SumAggregationFunction<FloatValue> {
+               private static final long serialVersionUID = 1L;
+
+               private double agg;
+
+               @Override
+               public void initializeAggregate() {
+                       agg = 0.0f;
+               }
+
+               @Override
+               public void aggregate(FloatValue value) {
+                       agg += value.getValue();
+               }
+
+               @Override
+               public FloatValue getAggregate() {
+                       return new FloatValue((float) agg);
                }
        }
-       
+
        public static final class DoubleSumAgg extends 
SumAggregationFunction<Double> {
                private static final long serialVersionUID = 1L;
-               
+
                private double agg;
 
                @Override
@@ -155,36 +266,75 @@ public abstract class SumAggregationFunction<T> extends 
AggregationFunction<T> {
                        return agg;
                }
        }
-       
+
+       public static final class DoubleValueSumAgg extends 
SumAggregationFunction<DoubleValue> {
+               private static final long serialVersionUID = 1L;
+
+               private double agg;
+
+               @Override
+               public void initializeAggregate() {
+                       agg = 0.0;
+               }
+
+               @Override
+               public void aggregate(DoubleValue value) {
+                       agg += value.getValue();
+               }
+
+               @Override
+               public DoubleValue getAggregate() {
+                       return new DoubleValue(agg);
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        public static final class SumAggregationFunctionFactory implements 
AggregationFunctionFactory {
                private static final long serialVersionUID = 1L;
-               
+
                @SuppressWarnings("unchecked")
                @Override
                public <T> AggregationFunction<T> 
createAggregationFunction(Class<T> type) {
                        if (type == Long.class) {
                                return (AggregationFunction<T>) new 
LongSumAgg();
                        }
+                       else if (type == LongValue.class) {
+                               return (AggregationFunction<T>) new 
LongValueSumAgg();
+                       }
                        else if (type == Integer.class) {
                                return (AggregationFunction<T>) new IntSumAgg();
                        }
+                       else if (type == IntValue.class) {
+                               return (AggregationFunction<T>) new 
IntValueSumAgg();
+                       }
                        else if (type == Double.class) {
                                return (AggregationFunction<T>) new 
DoubleSumAgg();
                        }
+                       else if (type == DoubleValue.class) {
+                               return (AggregationFunction<T>) new 
DoubleValueSumAgg();
+                       }
                        else if (type == Float.class) {
                                return (AggregationFunction<T>) new 
FloatSumAgg();
                        }
+                       else if (type == FloatValue.class) {
+                               return (AggregationFunction<T>) new 
FloatValueSumAgg();
+                       }
                        else if (type == Byte.class) {
                                return (AggregationFunction<T>) new 
ByteSumAgg();
                        }
+                       else if (type == ByteValue.class) {
+                               return (AggregationFunction<T>) new 
ByteValueSumAgg();
+                       }
                        else if (type == Short.class) {
                                return (AggregationFunction<T>) new 
ShortSumAgg();
                        }
+                       else if (type == ShortValue.class) {
+                               return (AggregationFunction<T>) new 
ShortValueSumAgg();
+                       }
                        else {
                                throw new 
UnsupportedAggregationTypeException("The type " + type.getName() + 
-                                       " has currently not supported for 
built-in sum aggregations.");
+                                       " is currently not supported for 
built-in sum aggregations.");
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 0b4823e..5187de7 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -51,7 +51,18 @@ import org.apache.flink.types.Value;
 public class ValueTypeInfo<T extends Value> extends TypeInformation<T> 
implements AtomicType<T> {
 
        private static final long serialVersionUID = 1L;
-       
+
+       public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO 
= new ValueTypeInfo<>(BooleanValue.class);
+       public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(ByteValue.class);
+       public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(CharValue.class);
+       public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(DoubleValue.class);
+       public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(FloatValue.class);
+       public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(IntValue.class);
+       public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(LongValue.class);
+       public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new 
ValueTypeInfo<>(NullValue.class);
+       public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(ShortValue.class);
+       public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = 
new ValueTypeInfo<>(StringValue.class);
+
        private final Class<T> type;
        
        public ValueTypeInfo(Class<T> type) {

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
index d02f228..fc01ce7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
@@ -25,7 +25,11 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.javaApiOperators.util.ValueCollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -62,6 +66,27 @@ public class AggregateITCase extends 
MultipleProgramsTestBase {
        }
 
        @Test
+       public void testFullAggregateOfMutableValueTypes() throws Exception {
+               /*
+                * Full Aggregate of mutable value types
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = 
ValueCollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds
+                               .aggregate(Aggregations.SUM, 0)
+                               .and(Aggregations.MAX, 1)
+                               .project(0, 1);
+
+               List<Tuple2<IntValue, LongValue>> result = 
aggregateDs.collect();
+
+               String expected = "231,6\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
        public void testGroupedAggregate() throws Exception {
                /*
                 * Grouped Aggregate
@@ -87,6 +112,31 @@ public class AggregateITCase extends 
MultipleProgramsTestBase {
        }
 
        @Test
+       public void testGroupedAggregateOfMutableValueTypes() throws Exception {
+               /*
+                * Grouped Aggregate of mutable value types
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = 
ValueCollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
+                               .aggregate(Aggregations.SUM, 0)
+                               .project(1, 0);
+
+               List<Tuple2<IntValue, LongValue>> result = 
aggregateDs.collect();
+
+               String expected = "1,1\n" +
+                               "2,5\n" +
+                               "3,15\n" +
+                               "4,34\n" +
+                               "5,65\n" +
+                               "6,111\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
        public void testNestedAggregate() throws Exception {
                /*
                 * Nested Aggregate
@@ -106,4 +156,25 @@ public class AggregateITCase extends 
MultipleProgramsTestBase {
 
                compareResultAsTuples(result, expected);
        }
+
+       @Test
+       public void testNestedAggregateOfMutableValueTypes() throws Exception {
+               /*
+                * Nested Aggregate of mutable value types
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = 
ValueCollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
+                               .aggregate(Aggregations.MIN, 0)
+                               .aggregate(Aggregations.MIN, 0)
+                               .project(0);
+
+               List<Tuple1<IntValue>> result = aggregateDs.collect();
+
+               String expected = "1\n";
+
+               compareResultAsTuples(result, expected);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
new file mode 100644
index 0000000..04a7bc5
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.util;
+
+import java.io.File;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.apache.hadoop.io.IntWritable;
+
+import scala.math.BigInt;
+
+/**
+ * 
#######################################################################################################
+ * 
+ *                     BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. 
+ *                     IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL 
TESTS ARE STILL WORKING!
+ * 
+ * 
#######################################################################################################
+ */
+public class ValueCollectionDataSets {
+
+       public static DataSet<Tuple3<IntValue, LongValue, StringValue>> 
get3TupleDataSet(ExecutionEnvironment env) {
+               List<Tuple3<IntValue, LongValue, StringValue>> data = new 
ArrayList<>();
+
+               data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new 
StringValue("Hi")));
+               data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new 
StringValue("Hello")));
+               data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new 
StringValue("Hello world")));
+               data.add(new Tuple3<>(new IntValue(4), new LongValue(3l), new 
StringValue("Hello world, how are you?")));
+               data.add(new Tuple3<>(new IntValue(5), new LongValue(3l), new 
StringValue("I am fine.")));
+               data.add(new Tuple3<>(new IntValue(6), new LongValue(3l), new 
StringValue("Luke Skywalker")));
+               data.add(new Tuple3<>(new IntValue(7), new LongValue(4l), new 
StringValue("Comment#1")));
+               data.add(new Tuple3<>(new IntValue(8), new LongValue(4l), new 
StringValue("Comment#2")));
+               data.add(new Tuple3<>(new IntValue(9), new LongValue(4l), new 
StringValue("Comment#3")));
+               data.add(new Tuple3<>(new IntValue(10), new LongValue(4l), new 
StringValue("Comment#4")));
+               data.add(new Tuple3<>(new IntValue(11), new LongValue(5l), new 
StringValue("Comment#5")));
+               data.add(new Tuple3<>(new IntValue(12), new LongValue(5l), new 
StringValue("Comment#6")));
+               data.add(new Tuple3<>(new IntValue(13), new LongValue(5l), new 
StringValue("Comment#7")));
+               data.add(new Tuple3<>(new IntValue(14), new LongValue(5l), new 
StringValue("Comment#8")));
+               data.add(new Tuple3<>(new IntValue(15), new LongValue(5l), new 
StringValue("Comment#9")));
+               data.add(new Tuple3<>(new IntValue(16), new LongValue(6l), new 
StringValue("Comment#10")));
+               data.add(new Tuple3<>(new IntValue(17), new LongValue(6l), new 
StringValue("Comment#11")));
+               data.add(new Tuple3<>(new IntValue(18), new LongValue(6l), new 
StringValue("Comment#12")));
+               data.add(new Tuple3<>(new IntValue(19), new LongValue(6l), new 
StringValue("Comment#13")));
+               data.add(new Tuple3<>(new IntValue(20), new LongValue(6l), new 
StringValue("Comment#14")));
+               data.add(new Tuple3<>(new IntValue(21), new LongValue(6l), new 
StringValue("Comment#15")));
+
+               Collections.shuffle(data);
+
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<Tuple3<IntValue, LongValue, StringValue>> 
getSmall3TupleDataSet(ExecutionEnvironment env) {
+               List<Tuple3<IntValue, LongValue, StringValue>> data = new 
ArrayList<>();
+
+               data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new 
StringValue("Hi")));
+               data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new 
StringValue("Hello")));
+               data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new 
StringValue("Hello world")));
+
+               Collections.shuffle(data);
+
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<Tuple5<IntValue, LongValue, IntValue, 
StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env) {
+               List<Tuple5<IntValue, LongValue, IntValue, StringValue, 
LongValue>> data = new ArrayList<>();
+
+               data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new 
IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
+               data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new 
IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
+               data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new 
IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
+               data.add(new Tuple5<>(new IntValue(3), new LongValue(4l), new 
IntValue(3), new StringValue("Hallo Welt wie gehts?"), new LongValue(2l)));
+               data.add(new Tuple5<>(new IntValue(3), new LongValue(5l), new 
IntValue(4), new StringValue("ABC"), new LongValue(2l)));
+               data.add(new Tuple5<>(new IntValue(3), new LongValue(6l), new 
IntValue(5), new StringValue("BCD"), new LongValue(3l)));
+               data.add(new Tuple5<>(new IntValue(4), new LongValue(7l), new 
IntValue(6), new StringValue("CDE"), new LongValue(2l)));
+               data.add(new Tuple5<>(new IntValue(4), new LongValue(8l), new 
IntValue(7), new StringValue("DEF"), new LongValue(1l)));
+               data.add(new Tuple5<>(new IntValue(4), new LongValue(9l), new 
IntValue(8), new StringValue("EFG"), new LongValue(1l)));
+               data.add(new Tuple5<>(new IntValue(4), new LongValue(10l), new 
IntValue(9), new StringValue("FGH"), new LongValue(2l)));
+               data.add(new Tuple5<>(new IntValue(5), new LongValue(11l), new 
IntValue(10), new StringValue("GHI"), new LongValue(1l)));
+               data.add(new Tuple5<>(new IntValue(5), new LongValue(12l), new 
IntValue(11), new StringValue("HIJ"), new LongValue(3l)));
+               data.add(new Tuple5<>(new IntValue(5), new LongValue(13l), new 
IntValue(12), new StringValue("IJK"), new LongValue(3l)));
+               data.add(new Tuple5<>(new IntValue(5), new LongValue(14l), new 
IntValue(13), new StringValue("JKL"), new LongValue(2l)));
+               data.add(new Tuple5<>(new IntValue(5), new LongValue(15l), new 
IntValue(14), new StringValue("KLM"), new LongValue(2l)));
+
+               Collections.shuffle(data);
+
+               TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, 
StringValue, LongValue>> type = new
+                       TupleTypeInfo<>(
+                               ValueTypeInfo.INT_VALUE_TYPE_INFO,
+                               ValueTypeInfo.LONG_VALUE_TYPE_INFO,
+                               ValueTypeInfo.INT_VALUE_TYPE_INFO,
+                               ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+                               ValueTypeInfo.LONG_VALUE_TYPE_INFO
+               );
+
+               return env.fromCollection(data, type);
+       }
+
+       public static DataSet<Tuple5<IntValue, LongValue, IntValue, 
StringValue, LongValue>> getSmall5TupleDataSet(ExecutionEnvironment env) {
+               List<Tuple5<IntValue, LongValue, IntValue, StringValue, 
LongValue>> data = new ArrayList<>();
+
+               data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new 
IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
+               data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new 
IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
+               data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new 
IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
+
+               Collections.shuffle(data);
+
+               TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, 
StringValue, LongValue>> type = new
+                       TupleTypeInfo<>(
+                               ValueTypeInfo.INT_VALUE_TYPE_INFO,
+                               ValueTypeInfo.LONG_VALUE_TYPE_INFO,
+                               ValueTypeInfo.INT_VALUE_TYPE_INFO,
+                               ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+                               ValueTypeInfo.LONG_VALUE_TYPE_INFO
+               );
+
+               return env.fromCollection(data, type);
+       }
+
+       public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> 
getSmallNestedTupleDataSet(ExecutionEnvironment env) {
+               List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = 
new ArrayList<>();
+
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new 
IntValue(1)), new StringValue("one")));
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new 
IntValue(2)), new StringValue("two")));
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new 
IntValue(3)), new StringValue("three")));
+
+               TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> 
type = new
+                       TupleTypeInfo<>(
+                               new TupleTypeInfo<Tuple2<IntValue, 
IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, 
ValueTypeInfo.INT_VALUE_TYPE_INFO),
+                               ValueTypeInfo.STRING_VALUE_TYPE_INFO
+               );
+
+               return env.fromCollection(data, type);
+       }
+
+       public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> 
getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
+               List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = 
new ArrayList<>();
+
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new 
IntValue(3)), new StringValue("a")));
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new 
IntValue(2)), new StringValue("a")));
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new 
IntValue(1)), new StringValue("a")));
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new 
IntValue(2)), new StringValue("b")));
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new 
IntValue(3)), new StringValue("c")));
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new 
IntValue(6)), new StringValue("c")));
+               data.add(new Tuple2<>(new Tuple2<>(new IntValue(4), new 
IntValue(9)), new StringValue("c")));
+
+               TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> 
type = new
+                       TupleTypeInfo<>(
+                               new TupleTypeInfo<Tuple2<IntValue, 
IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, 
ValueTypeInfo.INT_VALUE_TYPE_INFO),
+                               ValueTypeInfo.STRING_VALUE_TYPE_INFO
+               );
+
+               return env.fromCollection(data, type);
+       }
+
+       public static DataSet<Tuple3<Tuple2<IntValue, IntValue>, StringValue, 
IntValue>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
+               List<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> 
data = new ArrayList<>();
+
+               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(1), new IntValue(3)), new StringValue("a"), new IntValue(2)));
+               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(1), new IntValue(2)), new StringValue("a"), new IntValue(1)));
+               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(2), new IntValue(1)), new StringValue("a"), new IntValue(3)));
+               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(2), new IntValue(2)), new StringValue("b"), new IntValue(4)));
+               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(3), new IntValue(3)), new StringValue("c"), new IntValue(5)));
+               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(3), new IntValue(6)), new StringValue("c"), new IntValue(6)));
+               data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new 
IntValue(4), new IntValue(9)), new StringValue("c"), new IntValue(7)));
+
+               TupleTypeInfo<Tuple3<Tuple2<IntValue, IntValue>, StringValue, 
IntValue>> type = new
+                       TupleTypeInfo<>(
+                               new TupleTypeInfo<Tuple2<IntValue, 
IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, 
ValueTypeInfo.INT_VALUE_TYPE_INFO),
+                               ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+                               ValueTypeInfo.INT_VALUE_TYPE_INFO
+               );
+
+               return env.fromCollection(data, type);
+       }
+       
+       public static DataSet<StringValue> 
getStringDataSet(ExecutionEnvironment env) {
+               List<StringValue> data = new ArrayList<>();
+
+               data.add(new StringValue("Hi"));
+               data.add(new StringValue("Hello"));
+               data.add(new StringValue("Hello world"));
+               data.add(new StringValue("Hello world, how are you?"));
+               data.add(new StringValue("I am fine."));
+               data.add(new StringValue("Luke Skywalker"));
+               data.add(new StringValue("Random comment"));
+               data.add(new StringValue("LOL"));
+
+               Collections.shuffle(data);
+
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<IntValue> getIntDataSet(ExecutionEnvironment env) 
{
+               List<IntValue> data = new ArrayList<>();
+
+               data.add(new IntValue(1));
+               data.add(new IntValue(2));
+               data.add(new IntValue(2));
+               data.add(new IntValue(3));
+               data.add(new IntValue(3));
+               data.add(new IntValue(3));
+               data.add(new IntValue(4));
+               data.add(new IntValue(4));
+               data.add(new IntValue(4));
+               data.add(new IntValue(4));
+               data.add(new IntValue(5));
+               data.add(new IntValue(5));
+               data.add(new IntValue(5));
+               data.add(new IntValue(5));
+               data.add(new IntValue(5));
+
+               Collections.shuffle(data);
+
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<CustomType> 
getCustomTypeDataSet(ExecutionEnvironment env) {
+               List<CustomType> data = new ArrayList<CustomType>();
+
+               data.add(new CustomType(1, 0l, "Hi"));
+               data.add(new CustomType(2, 1l, "Hello"));
+               data.add(new CustomType(2, 2l, "Hello world"));
+               data.add(new CustomType(3, 3l, "Hello world, how are you?"));
+               data.add(new CustomType(3, 4l, "I am fine."));
+               data.add(new CustomType(3, 5l, "Luke Skywalker"));
+               data.add(new CustomType(4, 6l, "Comment#1"));
+               data.add(new CustomType(4, 7l, "Comment#2"));
+               data.add(new CustomType(4, 8l, "Comment#3"));
+               data.add(new CustomType(4, 9l, "Comment#4"));
+               data.add(new CustomType(5, 10l, "Comment#5"));
+               data.add(new CustomType(5, 11l, "Comment#6"));
+               data.add(new CustomType(5, 12l, "Comment#7"));
+               data.add(new CustomType(5, 13l, "Comment#8"));
+               data.add(new CustomType(5, 14l, "Comment#9"));
+               data.add(new CustomType(6, 15l, "Comment#10"));
+               data.add(new CustomType(6, 16l, "Comment#11"));
+               data.add(new CustomType(6, 17l, "Comment#12"));
+               data.add(new CustomType(6, 18l, "Comment#13"));
+               data.add(new CustomType(6, 19l, "Comment#14"));
+               data.add(new CustomType(6, 20l, "Comment#15"));
+
+               Collections.shuffle(data);
+
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<CustomType> 
getSmallCustomTypeDataSet(ExecutionEnvironment env) {
+               List<CustomType> data = new ArrayList<CustomType>();
+
+               data.add(new CustomType(1, 0l, "Hi"));
+               data.add(new CustomType(2, 1l, "Hello"));
+               data.add(new CustomType(2, 2l, "Hello world"));
+
+               Collections.shuffle(data);
+
+               return env.fromCollection(data);
+       }
+
+       public static class CustomType implements Serializable {
+
+               private static final long serialVersionUID = 1L;
+
+               public IntValue myInt;
+               public LongValue myLong;
+               public StringValue myString;
+
+               public CustomType() {
+               }
+
+               public CustomType(int i, long l, String s) {
+                       myInt = new IntValue(i);
+                       myLong = new LongValue(l);
+                       myString = new StringValue(s);
+               }
+
+               @Override
+               public String toString() {
+                       return myInt + "," + myLong + "," + myString;
+               }
+       }
+
+       public static class CustomTypeComparator implements 
Comparator<CustomType> {
+
+               @Override
+               public int compare(CustomType o1, CustomType o2) {
+                       int diff = o1.myInt.getValue() - o2.myInt.getValue();
+                       if (diff != 0) {
+                               return diff;
+                       }
+                       diff = (int) (o1.myLong.getValue() - 
o2.myLong.getValue());
+                       return diff != 0 ? diff : 
o1.myString.getValue().compareTo(o2.myString.getValue());
+               }
+
+       }
+
+       public static DataSet<Tuple7<IntValue, StringValue, IntValue, IntValue, 
LongValue, StringValue, LongValue>> 
getSmallTuplebasedDataSet(ExecutionEnvironment env) {
+               List<Tuple7<IntValue, StringValue, IntValue, IntValue, 
LongValue, StringValue, LongValue>> data = new ArrayList<>();
+               
+               data.add(new Tuple7<>(new IntValue(1), new 
StringValue("First"), new IntValue(10), new IntValue(100), new 
LongValue(1000L), new StringValue("One"), new LongValue(10000L)));
+               data.add(new Tuple7<>(new IntValue(2), new 
StringValue("Second"), new IntValue(20), new IntValue(200), new 
LongValue(2000L), new StringValue("Two"), new LongValue(20000L)));
+               data.add(new Tuple7<>(new IntValue(3), new 
StringValue("Third"), new IntValue(30), new IntValue(300), new 
LongValue(3000L), new StringValue("Three"), new LongValue(30000L)));
+
+               return env.fromCollection(data);
+       }
+       
+       public static DataSet<Tuple7<LongValue, IntValue, IntValue, LongValue, 
StringValue, IntValue, StringValue>> 
getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
+               List<Tuple7<LongValue, IntValue, IntValue, LongValue, 
StringValue, IntValue, StringValue>> data = new ArrayList<>();
+               
+               data.add(new Tuple7<>(new LongValue(10000L), new IntValue(10), 
new IntValue(100), new LongValue(1000L), new StringValue("One"), new 
IntValue(1), new StringValue("First")));
+               data.add(new Tuple7<>(new LongValue(20000L), new IntValue(20), 
new IntValue(200), new LongValue(2000L), new StringValue("Two"), new 
IntValue(2), new StringValue("Second")));
+               data.add(new Tuple7<>(new LongValue(30000L), new IntValue(30), 
new IntValue(300), new LongValue(3000L), new StringValue("Three"), new 
IntValue(3), new StringValue("Third")));
+               
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment 
env) {
+               List<POJO> data = new ArrayList<POJO>();
+
+               data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 
100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L 
/*nestedPojo.longNumber*/));
+               data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<POJO> 
getDuplicatePojoDataSet(ExecutionEnvironment env) {
+               List<POJO> data = new ArrayList<POJO>();
+
+               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); 
// 5x
+               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+               data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 
30000L)); // 2x
+               data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment 
env) {
+               List<POJO> data = new ArrayList<POJO>();
+
+               data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); 
// 5x
+               data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
+               data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
+               data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
+               data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
+               data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
+               data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 
10200L)); // 2x
+               data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 
10100L));
+
+               return env.fromCollection(data);
+       }
+
+       public static class POJO {
+               public IntValue number;
+               public StringValue str;
+               public Tuple2<IntValue, CustomType> nestedTupleWithCustom;
+               public NestedPojo nestedPojo;
+               public transient LongValue ignoreMe;
+
+               public POJO(int i0, String s0,
+                                       int i1, int i2, long l0, String s1,
+                                       long l1) {
+                       this.number = new IntValue(i0);
+                       this.str = new StringValue(s0);
+                       this.nestedTupleWithCustom = new Tuple2<>(new 
IntValue(i1), new CustomType(i2, l0, s1));
+                       this.nestedPojo = new NestedPojo();
+                       this.nestedPojo.longNumber = new LongValue(l1);
+               }
+
+               public POJO() {
+               }
+
+               @Override
+               public String toString() {
+                       return number + " " + str + " " + nestedTupleWithCustom 
+ " " + nestedPojo.longNumber;
+               }
+       }
+
+       public static class NestedPojo {
+               public static Object ignoreMe;
+               public LongValue longNumber;
+
+               public NestedPojo() {
+               }
+       }
+
+       public static DataSet<CrazyNested> 
getCrazyNestedDataSet(ExecutionEnvironment env) {
+               List<CrazyNested> data = new ArrayList<CrazyNested>();
+
+               data.add(new CrazyNested("aa"));
+               data.add(new CrazyNested("bb"));
+               data.add(new CrazyNested("bb"));
+               data.add(new CrazyNested("cc"));
+               data.add(new CrazyNested("cc"));
+               data.add(new CrazyNested("cc"));
+
+               return env.fromCollection(data);
+       }
+
+       public static class CrazyNested {
+               public CrazyNestedL1 nest_Lvl1;
+               public LongValue something; // test proper null-value handling
+
+               public CrazyNested() {
+               }
+
+               public CrazyNested(String set, String second, long s) { // 
additional CTor to set all fields to non-null values
+                       this(set);
+                       something = new LongValue(s);
+                       nest_Lvl1.a = new StringValue(second);
+               }
+
+               public CrazyNested(String set) {
+                       nest_Lvl1 = new CrazyNestedL1();
+                       nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
+                       nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3();
+                       nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new 
CrazyNestedL4();
+                       nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = new 
StringValue(set);
+               }
+       }
+
+       public static class CrazyNestedL1 {
+               public StringValue a;
+               public IntValue b;
+               public CrazyNestedL2 nest_Lvl2;
+       }
+
+       public static class CrazyNestedL2 {
+               public CrazyNestedL3 nest_Lvl3;
+       }
+
+       public static class CrazyNestedL3 {
+               public CrazyNestedL4 nest_Lvl4;
+       }
+
+       public static class CrazyNestedL4 {
+               public StringValue f1nal;
+       }
+
+       // Copied from TypeExtractorTest
+       public static class FromTuple extends Tuple3<StringValue, StringValue, 
LongValue> {
+               private static final long serialVersionUID = 1L;
+               public IntValue special;
+       }
+
+       public static class FromTupleWithCTor extends FromTuple {
+
+               private static final long serialVersionUID = 1L;
+
+               public FromTupleWithCTor() {}
+
+               public FromTupleWithCTor(int special, long tupleField) {
+                       this.special = new IntValue(special);
+                       this.setField(new LongValue(tupleField), 2);
+               }
+       }
+
+       public static DataSet<FromTupleWithCTor> 
getPojoExtendingFromTuple(ExecutionEnvironment env) {
+               List<FromTupleWithCTor> data = new ArrayList<>();
+               data.add(new FromTupleWithCTor(1, 10L)); // 3x
+               data.add(new FromTupleWithCTor(1, 10L));
+               data.add(new FromTupleWithCTor(1, 10L));
+               data.add(new FromTupleWithCTor(2, 20L)); // 2x
+               data.add(new FromTupleWithCTor(2, 20L));
+               return env.fromCollection(data);
+       }
+
+       public static class PojoContainingTupleAndWritable {
+               public IntValue someInt;
+               public StringValue someString;
+               public IntWritable hadoopFan;
+               public Tuple2<LongValue, LongValue> theTuple;
+
+               public PojoContainingTupleAndWritable() {
+               }
+
+               public PojoContainingTupleAndWritable(int i, long l1, long l2) {
+                       hadoopFan = new IntWritable(i);
+                       someInt = new IntValue(i);
+                       theTuple = new Tuple2<>(new LongValue(l1), new 
LongValue(l2));
+               }
+       }
+
+       public static DataSet<PojoContainingTupleAndWritable> 
getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+               List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+               data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 
1x
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 
5x
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+               return env.fromCollection(data);
+       }
+
+
+
+       public static DataSet<PojoContainingTupleAndWritable> 
getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+               List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+               data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 
1x
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 
5x
+               data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
+               data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
+               data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
+               data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<Tuple3<IntValue, CrazyNested, POJO>> 
getTupleContainingPojos(ExecutionEnvironment env) {
+               List<Tuple3<IntValue, CrazyNested, POJO>> data = new 
ArrayList<>();
+               data.add(new Tuple3<IntValue, CrazyNested, POJO>(new 
IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L))); // 3x
+               data.add(new Tuple3<IntValue, CrazyNested, POJO>(new 
IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L)));
+               data.add(new Tuple3<IntValue, CrazyNested, POJO>(new 
IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L)));
+               // POJO is not initialized according to the first two fields.
+               data.add(new Tuple3<IntValue, CrazyNested, POJO>(new 
IntValue(2), new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 
1000L, "One", 10000L))); // 1x
+               return env.fromCollection(data);
+       }
+
+       public static class Pojo1 {
+               public StringValue a;
+               public StringValue b;
+
+               public Pojo1() {}
+
+               public Pojo1(String a, String b) {
+                       this.a = new StringValue(a);
+                       this.b = new StringValue(b);
+               }
+       }
+
+       public static class Pojo2 {
+               public StringValue a2;
+               public StringValue b2;
+       }
+
+       public static class PojoWithMultiplePojos {
+               public Pojo1 p1;
+               public Pojo2 p2;
+               public IntValue i0;
+
+               public PojoWithMultiplePojos() {
+               }
+
+               public PojoWithMultiplePojos(String a, String b, String a1, 
String b1, int i0) {
+                       p1 = new Pojo1();
+                       p1.a = new StringValue(a);
+                       p1.b = new StringValue(b);
+                       p2 = new Pojo2();
+                       p2.a2 = new StringValue(a1);
+                       p2.b2 = new StringValue(b1);
+                       this.i0 = new IntValue(i0);
+               }
+       }
+
+       public static DataSet<PojoWithMultiplePojos> 
getPojoWithMultiplePojos(ExecutionEnvironment env) {
+               List<PojoWithMultiplePojos> data = new ArrayList<>();
+               data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
+               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+               data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+               data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+               data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+               return env.fromCollection(data);
+       }
+
+       public enum Category {
+               CAT_A, CAT_B;
+       }
+
+       public static class PojoWithDateAndEnum {
+               public StringValue group;
+               public Date date;
+               public Category cat;
+       }
+       
+       public static DataSet<PojoWithDateAndEnum> 
getPojoWithDateAndEnum(ExecutionEnvironment env) {
+               List<PojoWithDateAndEnum> data = new 
ArrayList<PojoWithDateAndEnum>();
+               
+               PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+               one.group = new StringValue("a");
+               one.date = new Date(666);
+               one.cat = Category.CAT_A;
+               data.add(one);
+               
+               PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+               two.group = new StringValue("a");
+               two.date = new Date(666);
+               two.cat = Category.CAT_A;
+               data.add(two);
+               
+               PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+               three.group = new StringValue("b");
+               three.date = new Date(666);
+               three.cat = Category.CAT_B;
+               data.add(three);
+               
+               return env.fromCollection(data);
+       }
+
+       public static class PojoWithCollection {
+               public List<Pojo1> pojos;
+               public IntValue key;
+               public java.sql.Date sqlDate;
+               public BigInteger bigInt;
+               public BigDecimal bigDecimalKeepItNull;
+               public BigInt scalaBigInt;
+               public List<Object> mixed;
+
+               @Override
+               public String toString() {
+                       return "PojoWithCollection{" +
+                                       "pojos.size()=" + pojos.size() +
+                                       ", key=" + key +
+                                       ", sqlDate=" + sqlDate +
+                                       ", bigInt=" + bigInt +
+                                       ", bigDecimalKeepItNull=" + 
bigDecimalKeepItNull +
+                                       ", scalaBigInt=" + scalaBigInt +
+                                       ", mixed=" + mixed +
+                                       '}';
+               }
+       }
+
+       public static class PojoWithCollectionGeneric {
+               public List<Pojo1> pojos;
+               public IntValue key;
+               public java.sql.Date sqlDate;
+               public BigInteger bigInt;
+               public BigDecimal bigDecimalKeepItNull;
+               public BigInt scalaBigInt;
+               public List<Object> mixed;
+               private PojoWithDateAndEnum makeMeGeneric;
+
+               @Override
+               public String toString() {
+                       return "PojoWithCollection{" +
+                                       "pojos.size()=" + pojos.size() +
+                                       ", key=" + key +
+                                       ", sqlDate=" + sqlDate +
+                                       ", bigInt=" + bigInt +
+                                       ", bigDecimalKeepItNull=" + 
bigDecimalKeepItNull +
+                                       ", scalaBigInt=" + scalaBigInt +
+                                       ", mixed=" + mixed +
+                                       '}';
+               }
+       }
+
+       public static DataSet<PojoWithCollection> 
getPojoWithCollection(ExecutionEnvironment env) {
+               List<PojoWithCollection> data = new ArrayList<>();
+
+               List<Pojo1> pojosList1 = new ArrayList<>();
+               pojosList1.add(new Pojo1("a", "aa"));
+               pojosList1.add(new Pojo1("b", "bb"));
+
+               List<Pojo1> pojosList2 = new ArrayList<>();
+               pojosList2.add(new Pojo1("a2", "aa2"));
+               pojosList2.add(new Pojo1("b2", "bb2"));
+
+               PojoWithCollection pwc1 = new PojoWithCollection();
+               pwc1.pojos = pojosList1;
+               pwc1.key = new IntValue(0);
+               pwc1.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+               pwc1.scalaBigInt = BigInt.int2bigInt(10);
+               pwc1.bigDecimalKeepItNull = null;
+               
+               // use calendar to make it stable across time zones
+               GregorianCalendar gcl1 = new GregorianCalendar(2033, 04, 18);
+               pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
+               pwc1.mixed = new ArrayList<Object>();
+               Map<StringValue, IntValue> map = new HashMap<>();
+               map.put(new StringValue("someKey"), new IntValue(1));
+               pwc1.mixed.add(map);
+               pwc1.mixed.add(new File("/this/is/wrong"));
+               pwc1.mixed.add("uhlala");
+
+               PojoWithCollection pwc2 = new PojoWithCollection();
+               pwc2.pojos = pojosList2;
+               pwc2.key = new IntValue(0);
+               pwc2.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+               pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+               pwc2.bigDecimalKeepItNull = null;
+               
+               GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+               pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 
1976
+
+               data.add(pwc1);
+               data.add(pwc2);
+
+               return env.fromCollection(data);
+       }
+
+}
+

Reply via email to