[FLINK-2725] Add Max/Min/Sum aggregation for mutable types. This closes #1191
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da248b15 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da248b15 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da248b15 Branch: refs/heads/master Commit: da248b15e1b1dbe09345d3bb186dc815a45e9a3c Parents: 6491559 Author: Greg Hogan <c...@greghogan.com> Authored: Tue Sep 22 13:01:47 2015 -0400 Committer: Fabian Hueske <fhue...@apache.org> Committed: Mon Oct 19 15:39:28 2015 +0200 ---------------------------------------------------------------------- .../aggregation/MaxAggregationFunction.java | 83 ++- .../aggregation/MinAggregationFunction.java | 85 ++- .../aggregation/SumAggregationFunction.java | 190 ++++- .../flink/api/java/typeutils/ValueTypeInfo.java | 13 +- .../test/javaApiOperators/AggregateITCase.java | 71 ++ .../util/ValueCollectionDataSets.java | 730 +++++++++++++++++++ 6 files changed, 1110 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java index f25ca87..59d9e13 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java @@ -18,35 +18,74 @@ package org.apache.flink.api.java.aggregation; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.ResettableValue; -public class MaxAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> { - private static final long serialVersionUID = 1L; - private T value; +public abstract class MaxAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> { + private static final long serialVersionUID = 1L; @Override - public void initializeAggregate() { - value = null; + public String toString() { + return "MAX"; } - @Override - public void aggregate(T val) { - if (value != null) { - int cmp = value.compareTo(val); - value = (cmp > 0) ? value : val; - } else { - value = val; + // -------------------------------------------------------------------------------------------- + + public static final class ImmutableMaxAgg<U extends Comparable<U>> extends MaxAggregationFunction<U> { + private static final long serialVersionUID = 1L; + + private U value; + + @Override + public void initializeAggregate() { + value = null; } - } - @Override - public T getAggregate() { - return value; + @Override + public void aggregate(U val) { + if (value != null) { + int cmp = value.compareTo(val); + value = (cmp > 0) ? value : val; + } else { + value = val; + } + } + + @Override + public U getAggregate() { + return value; + } } - @Override - public String toString() { - return "MAX"; + // -------------------------------------------------------------------------------------------- + + public static final class MutableMaxAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MaxAggregationFunction<U> { + private static final long serialVersionUID = 1L; + + private U value; + + @Override + public void initializeAggregate() { + value = null; + } + + @Override + public void aggregate(U val) { + if (value != null) { + int cmp = value.compareTo(val); + if (cmp < 0) { + value.setValue(val); + } + } else { + value = val.copy(); + } + } + + @Override + public U getAggregate() { + return value; + } } // -------------------------------------------------------------------------------------------- @@ -58,7 +97,11 @@ public class MaxAggregationFunction<T extends Comparable<T>> extends Aggregation @Override public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) { if (Comparable.class.isAssignableFrom(type)) { - return (AggregationFunction<T>) new MaxAggregationFunction(); + if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) { + return (AggregationFunction<T>) new MutableMaxAgg(); + } else { + return (AggregationFunction<T>) new ImmutableMaxAgg(); + } } else { throw new UnsupportedAggregationTypeException("The type " + type.getName() + " is not supported for maximum aggregation. " + http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java index faf28a7..b72b0f4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java @@ -18,35 +18,74 @@ package org.apache.flink.api.java.aggregation; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.ResettableValue; -public class MinAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> { - private static final long serialVersionUID = 1L; - private T value; +public abstract class MinAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> { + private static final long serialVersionUID = 1L; @Override - public void initializeAggregate() { - value = null; + public String toString() { + return "MIN"; } - @Override - public void aggregate(T val) { - if (value != null) { - int cmp = value.compareTo(val); - value = (cmp < 0) ? value : val; - } else { - value = val; + // -------------------------------------------------------------------------------------------- + + public static final class ImmutableMinAgg<U extends Comparable<U>> extends MinAggregationFunction<U> { + private static final long serialVersionUID = 1L; + + private U value; + + @Override + public void initializeAggregate() { + value = null; } - } - @Override - public T getAggregate() { - return value; + @Override + public void aggregate(U val) { + if (value != null) { + int cmp = value.compareTo(val); + value = (cmp < 0) ? value : val; + } else { + value = val; + } + } + + @Override + public U getAggregate() { + return value; + } } - - @Override - public String toString() { - return "MIN"; + + // -------------------------------------------------------------------------------------------- + + public static final class MutableMinAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MinAggregationFunction<U> { + private static final long serialVersionUID = 1L; + + private U value; + + @Override + public void initializeAggregate() { + value = null; + } + + @Override + public void aggregate(U val) { + if (value != null) { + int cmp = value.compareTo(val); + if (cmp > 0) { + value.setValue(val); + } + } else { + value = val.copy(); + } + } + + @Override + public U getAggregate() { + return value; + } } // -------------------------------------------------------------------------------------------- @@ -58,7 +97,11 @@ public class MinAggregationFunction<T extends Comparable<T>> extends Aggregation @Override public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) { if (Comparable.class.isAssignableFrom(type)) { - return (AggregationFunction<T>) new MinAggregationFunction(); + if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) { + return (AggregationFunction<T>) new MutableMinAgg(); + } else { + return (AggregationFunction<T>) new ImmutableMinAgg(); + } } else { throw new UnsupportedAggregationTypeException("The type " + type.getName() + " is not supported for minimum aggregation. " + http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java index 24e8f31..ad4644b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java @@ -18,21 +18,27 @@ package org.apache.flink.api.java.aggregation; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.ShortValue; public abstract class SumAggregationFunction<T> extends AggregationFunction<T> { - + private static final long serialVersionUID = 1L; @Override public String toString() { return "SUM"; } - + // -------------------------------------------------------------------------------------------- - + public static final class ByteSumAgg extends SumAggregationFunction<Byte> { private static final long serialVersionUID = 1L; - + private long agg; @Override @@ -50,10 +56,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> { return (byte) agg; } } - + + public static final class ByteValueSumAgg extends SumAggregationFunction<ByteValue> { + private static final long serialVersionUID = 1L; + + private long agg; + + @Override + public void initializeAggregate() { + agg = 0; + } + + @Override + public void aggregate(ByteValue value) { + agg += value.getValue(); + } + + @Override + public ByteValue getAggregate() { + return new ByteValue((byte) agg); + } + } + public static final class ShortSumAgg extends SumAggregationFunction<Short> { private static final long serialVersionUID = 1L; - + private long agg; @Override @@ -71,10 +98,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> { return (short) agg; } } - + + public static final class ShortValueSumAgg extends SumAggregationFunction<ShortValue> { + private static final long serialVersionUID = 1L; + + private long agg; + + @Override + public void initializeAggregate() { + agg = 0; + } + + @Override + public void aggregate(ShortValue value) { + agg += value.getValue(); + } + + @Override + public ShortValue getAggregate() { + return new ShortValue((short) agg); + } + } + public static final class IntSumAgg extends SumAggregationFunction<Integer> { private static final long serialVersionUID = 1L; - + private long agg; @Override @@ -92,10 +140,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> { return (int) agg; } } - + + public static final class IntValueSumAgg extends SumAggregationFunction<IntValue> { + private static final long serialVersionUID = 1L; + + private long agg; + + @Override + public void initializeAggregate() { + agg = 0; + } + + @Override + public void aggregate(IntValue value) { + agg += value.getValue(); + } + + @Override + public IntValue getAggregate() { + return new IntValue((int) agg); + } + } + public static final class LongSumAgg extends SumAggregationFunction<Long> { private static final long serialVersionUID = 1L; - + private long agg; @Override @@ -113,11 +182,32 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> { return agg; } } - + + public static final class LongValueSumAgg extends SumAggregationFunction<LongValue> { + private static final long serialVersionUID = 1L; + + private long agg; + + @Override + public void initializeAggregate() { + agg = 0L; + } + + @Override + public void aggregate(LongValue value) { + agg += value.getValue(); + } + + @Override + public LongValue getAggregate() { + return new LongValue(agg); + } + } + public static final class FloatSumAgg extends SumAggregationFunction<Float> { private static final long serialVersionUID = 1L; - - private float agg; + + private double agg; @Override public void initializeAggregate() { @@ -131,13 +221,34 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> { @Override public Float getAggregate() { - return agg; + return (float) agg; + } + } + + public static final class FloatValueSumAgg extends SumAggregationFunction<FloatValue> { + private static final long serialVersionUID = 1L; + + private double agg; + + @Override + public void initializeAggregate() { + agg = 0.0f; + } + + @Override + public void aggregate(FloatValue value) { + agg += value.getValue(); + } + + @Override + public FloatValue getAggregate() { + return new FloatValue((float) agg); } } - + public static final class DoubleSumAgg extends SumAggregationFunction<Double> { private static final long serialVersionUID = 1L; - + private double agg; @Override @@ -155,36 +266,75 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> { return agg; } } - + + public static final class DoubleValueSumAgg extends SumAggregationFunction<DoubleValue> { + private static final long serialVersionUID = 1L; + + private double agg; + + @Override + public void initializeAggregate() { + agg = 0.0; + } + + @Override + public void aggregate(DoubleValue value) { + agg += value.getValue(); + } + + @Override + public DoubleValue getAggregate() { + return new DoubleValue(agg); + } + } + // -------------------------------------------------------------------------------------------- - + public static final class SumAggregationFunctionFactory implements AggregationFunctionFactory { private static final long serialVersionUID = 1L; - + @SuppressWarnings("unchecked") @Override public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) { if (type == Long.class) { return (AggregationFunction<T>) new LongSumAgg(); } + else if (type == LongValue.class) { + return (AggregationFunction<T>) new LongValueSumAgg(); + } else if (type == Integer.class) { return (AggregationFunction<T>) new IntSumAgg(); } + else if (type == IntValue.class) { + return (AggregationFunction<T>) new IntValueSumAgg(); + } else if (type == Double.class) { return (AggregationFunction<T>) new DoubleSumAgg(); } + else if (type == DoubleValue.class) { + return (AggregationFunction<T>) new DoubleValueSumAgg(); + } else if (type == Float.class) { return (AggregationFunction<T>) new FloatSumAgg(); } + else if (type == FloatValue.class) { + return (AggregationFunction<T>) new FloatValueSumAgg(); + } else if (type == Byte.class) { return (AggregationFunction<T>) new ByteSumAgg(); } + else if (type == ByteValue.class) { + return (AggregationFunction<T>) new ByteValueSumAgg(); + } else if (type == Short.class) { return (AggregationFunction<T>) new ShortSumAgg(); } + else if (type == ShortValue.class) { + return (AggregationFunction<T>) new ShortValueSumAgg(); + } else { throw new UnsupportedAggregationTypeException("The type " + type.getName() + - " has currently not supported for built-in sum aggregations."); + " is currently not supported for built-in sum aggregations."); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java index 0b4823e..5187de7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java @@ -51,7 +51,18 @@ import org.apache.flink.types.Value; public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implements AtomicType<T> { private static final long serialVersionUID = 1L; - + + public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO = new ValueTypeInfo<>(BooleanValue.class); + public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new ValueTypeInfo<>(ByteValue.class); + public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new ValueTypeInfo<>(CharValue.class); + public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = new ValueTypeInfo<>(DoubleValue.class); + public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = new ValueTypeInfo<>(FloatValue.class); + public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new ValueTypeInfo<>(IntValue.class); + public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new ValueTypeInfo<>(LongValue.class); + public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new ValueTypeInfo<>(NullValue.class); + public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = new ValueTypeInfo<>(ShortValue.class); + public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class); + private final Class<T> type; public ValueTypeInfo(Class<T> type) { http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java index d02f228..fc01ce7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java @@ -25,7 +25,11 @@ import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.javaApiOperators.util.ValueCollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -62,6 +66,27 @@ public class AggregateITCase extends MultipleProgramsTestBase { } @Test + public void testFullAggregateOfMutableValueTypes() throws Exception { + /* + * Full Aggregate of mutable value types + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds + .aggregate(Aggregations.SUM, 0) + .and(Aggregations.MAX, 1) + .project(0, 1); + + List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect(); + + String expected = "231,6\n"; + + compareResultAsTuples(result, expected); + } + + @Test public void testGroupedAggregate() throws Exception { /* * Grouped Aggregate @@ -87,6 +112,31 @@ public class AggregateITCase extends MultipleProgramsTestBase { } @Test + public void testGroupedAggregateOfMutableValueTypes() throws Exception { + /* + * Grouped Aggregate of mutable value types + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1) + .aggregate(Aggregations.SUM, 0) + .project(1, 0); + + List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect(); + + String expected = "1,1\n" + + "2,5\n" + + "3,15\n" + + "4,34\n" + + "5,65\n" + + "6,111\n"; + + compareResultAsTuples(result, expected); + } + + @Test public void testNestedAggregate() throws Exception { /* * Nested Aggregate @@ -106,4 +156,25 @@ public class AggregateITCase extends MultipleProgramsTestBase { compareResultAsTuples(result, expected); } + + @Test + public void testNestedAggregateOfMutableValueTypes() throws Exception { + /* + * Nested Aggregate of mutable value types + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1) + .aggregate(Aggregations.MIN, 0) + .aggregate(Aggregations.MIN, 0) + .project(0); + + List<Tuple1<IntValue>> result = aggregateDs.collect(); + + String expected = "1\n"; + + compareResultAsTuples(result, expected); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java new file mode 100644 index 0000000..04a7bc5 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java @@ -0,0 +1,730 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.util; + +import java.io.File; +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; +import org.apache.hadoop.io.IntWritable; + +import scala.math.BigInt; + +/** + * ####################################################################################################### + * + * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. + * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING! + * + * ####################################################################################################### + */ +public class ValueCollectionDataSets { + + public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env) { + List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>(); + + data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi"))); + data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello"))); + data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world"))); + data.add(new Tuple3<>(new IntValue(4), new LongValue(3l), new StringValue("Hello world, how are you?"))); + data.add(new Tuple3<>(new IntValue(5), new LongValue(3l), new StringValue("I am fine."))); + data.add(new Tuple3<>(new IntValue(6), new LongValue(3l), new StringValue("Luke Skywalker"))); + data.add(new Tuple3<>(new IntValue(7), new LongValue(4l), new StringValue("Comment#1"))); + data.add(new Tuple3<>(new IntValue(8), new LongValue(4l), new StringValue("Comment#2"))); + data.add(new Tuple3<>(new IntValue(9), new LongValue(4l), new StringValue("Comment#3"))); + data.add(new Tuple3<>(new IntValue(10), new LongValue(4l), new StringValue("Comment#4"))); + data.add(new Tuple3<>(new IntValue(11), new LongValue(5l), new StringValue("Comment#5"))); + data.add(new Tuple3<>(new IntValue(12), new LongValue(5l), new StringValue("Comment#6"))); + data.add(new Tuple3<>(new IntValue(13), new LongValue(5l), new StringValue("Comment#7"))); + data.add(new Tuple3<>(new IntValue(14), new LongValue(5l), new StringValue("Comment#8"))); + data.add(new Tuple3<>(new IntValue(15), new LongValue(5l), new StringValue("Comment#9"))); + data.add(new Tuple3<>(new IntValue(16), new LongValue(6l), new StringValue("Comment#10"))); + data.add(new Tuple3<>(new IntValue(17), new LongValue(6l), new StringValue("Comment#11"))); + data.add(new Tuple3<>(new IntValue(18), new LongValue(6l), new StringValue("Comment#12"))); + data.add(new Tuple3<>(new IntValue(19), new LongValue(6l), new StringValue("Comment#13"))); + data.add(new Tuple3<>(new IntValue(20), new LongValue(6l), new StringValue("Comment#14"))); + data.add(new Tuple3<>(new IntValue(21), new LongValue(6l), new StringValue("Comment#15"))); + + Collections.shuffle(data); + + return env.fromCollection(data); + } + + public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env) { + List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>(); + + data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi"))); + data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello"))); + data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world"))); + + Collections.shuffle(data); + + return env.fromCollection(data); + } + + public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env) { + List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>(); + + data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l))); + data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l))); + data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l))); + data.add(new Tuple5<>(new IntValue(3), new LongValue(4l), new IntValue(3), new StringValue("Hallo Welt wie gehts?"), new LongValue(2l))); + data.add(new Tuple5<>(new IntValue(3), new LongValue(5l), new IntValue(4), new StringValue("ABC"), new LongValue(2l))); + data.add(new Tuple5<>(new IntValue(3), new LongValue(6l), new IntValue(5), new StringValue("BCD"), new LongValue(3l))); + data.add(new Tuple5<>(new IntValue(4), new LongValue(7l), new IntValue(6), new StringValue("CDE"), new LongValue(2l))); + data.add(new Tuple5<>(new IntValue(4), new LongValue(8l), new IntValue(7), new StringValue("DEF"), new LongValue(1l))); + data.add(new Tuple5<>(new IntValue(4), new LongValue(9l), new IntValue(8), new StringValue("EFG"), new LongValue(1l))); + data.add(new Tuple5<>(new IntValue(4), new LongValue(10l), new IntValue(9), new StringValue("FGH"), new LongValue(2l))); + data.add(new Tuple5<>(new IntValue(5), new LongValue(11l), new IntValue(10), new StringValue("GHI"), new LongValue(1l))); + data.add(new Tuple5<>(new IntValue(5), new LongValue(12l), new IntValue(11), new StringValue("HIJ"), new LongValue(3l))); + data.add(new Tuple5<>(new IntValue(5), new LongValue(13l), new IntValue(12), new StringValue("IJK"), new LongValue(3l))); + data.add(new Tuple5<>(new IntValue(5), new LongValue(14l), new IntValue(13), new StringValue("JKL"), new LongValue(2l))); + data.add(new Tuple5<>(new IntValue(5), new LongValue(15l), new IntValue(14), new StringValue("KLM"), new LongValue(2l))); + + Collections.shuffle(data); + + TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new + TupleTypeInfo<>( + ValueTypeInfo.INT_VALUE_TYPE_INFO, + ValueTypeInfo.LONG_VALUE_TYPE_INFO, + ValueTypeInfo.INT_VALUE_TYPE_INFO, + ValueTypeInfo.STRING_VALUE_TYPE_INFO, + ValueTypeInfo.LONG_VALUE_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + + public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> getSmall5TupleDataSet(ExecutionEnvironment env) { + List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>(); + + data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l))); + data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l))); + data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l))); + + Collections.shuffle(data); + + TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new + TupleTypeInfo<>( + ValueTypeInfo.INT_VALUE_TYPE_INFO, + ValueTypeInfo.LONG_VALUE_TYPE_INFO, + ValueTypeInfo.INT_VALUE_TYPE_INFO, + ValueTypeInfo.STRING_VALUE_TYPE_INFO, + ValueTypeInfo.LONG_VALUE_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + + public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getSmallNestedTupleDataSet(ExecutionEnvironment env) { + List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>(); + + data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(1)), new StringValue("one"))); + data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("two"))); + data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("three"))); + + TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new + TupleTypeInfo<>( + new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO), + ValueTypeInfo.STRING_VALUE_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + + public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) { + List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>(); + + data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(3)), new StringValue("a"))); + data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(2)), new StringValue("a"))); + data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(1)), new StringValue("a"))); + data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("b"))); + data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("c"))); + data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(6)), new StringValue("c"))); + data.add(new Tuple2<>(new Tuple2<>(new IntValue(4), new IntValue(9)), new StringValue("c"))); + + TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new + TupleTypeInfo<>( + new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO), + ValueTypeInfo.STRING_VALUE_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + + public static DataSet<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) { + List<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> data = new ArrayList<>(); + + data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(3)), new StringValue("a"), new IntValue(2))); + data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(2)), new StringValue("a"), new IntValue(1))); + data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(1)), new StringValue("a"), new IntValue(3))); + data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(2)), new StringValue("b"), new IntValue(4))); + data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(3)), new StringValue("c"), new IntValue(5))); + data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(6)), new StringValue("c"), new IntValue(6))); + data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(4), new IntValue(9)), new StringValue("c"), new IntValue(7))); + + TupleTypeInfo<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> type = new + TupleTypeInfo<>( + new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO), + ValueTypeInfo.STRING_VALUE_TYPE_INFO, + ValueTypeInfo.INT_VALUE_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + + public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env) { + List<StringValue> data = new ArrayList<>(); + + data.add(new StringValue("Hi")); + data.add(new StringValue("Hello")); + data.add(new StringValue("Hello world")); + data.add(new StringValue("Hello world, how are you?")); + data.add(new StringValue("I am fine.")); + data.add(new StringValue("Luke Skywalker")); + data.add(new StringValue("Random comment")); + data.add(new StringValue("LOL")); + + Collections.shuffle(data); + + return env.fromCollection(data); + } + + public static DataSet<IntValue> getIntDataSet(ExecutionEnvironment env) { + List<IntValue> data = new ArrayList<>(); + + data.add(new IntValue(1)); + data.add(new IntValue(2)); + data.add(new IntValue(2)); + data.add(new IntValue(3)); + data.add(new IntValue(3)); + data.add(new IntValue(3)); + data.add(new IntValue(4)); + data.add(new IntValue(4)); + data.add(new IntValue(4)); + data.add(new IntValue(4)); + data.add(new IntValue(5)); + data.add(new IntValue(5)); + data.add(new IntValue(5)); + data.add(new IntValue(5)); + data.add(new IntValue(5)); + + Collections.shuffle(data); + + return env.fromCollection(data); + } + + public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) { + List<CustomType> data = new ArrayList<CustomType>(); + + data.add(new CustomType(1, 0l, "Hi")); + data.add(new CustomType(2, 1l, "Hello")); + data.add(new CustomType(2, 2l, "Hello world")); + data.add(new CustomType(3, 3l, "Hello world, how are you?")); + data.add(new CustomType(3, 4l, "I am fine.")); + data.add(new CustomType(3, 5l, "Luke Skywalker")); + data.add(new CustomType(4, 6l, "Comment#1")); + data.add(new CustomType(4, 7l, "Comment#2")); + data.add(new CustomType(4, 8l, "Comment#3")); + data.add(new CustomType(4, 9l, "Comment#4")); + data.add(new CustomType(5, 10l, "Comment#5")); + data.add(new CustomType(5, 11l, "Comment#6")); + data.add(new CustomType(5, 12l, "Comment#7")); + data.add(new CustomType(5, 13l, "Comment#8")); + data.add(new CustomType(5, 14l, "Comment#9")); + data.add(new CustomType(6, 15l, "Comment#10")); + data.add(new CustomType(6, 16l, "Comment#11")); + data.add(new CustomType(6, 17l, "Comment#12")); + data.add(new CustomType(6, 18l, "Comment#13")); + data.add(new CustomType(6, 19l, "Comment#14")); + data.add(new CustomType(6, 20l, "Comment#15")); + + Collections.shuffle(data); + + return env.fromCollection(data); + } + + public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) { + List<CustomType> data = new ArrayList<CustomType>(); + + data.add(new CustomType(1, 0l, "Hi")); + data.add(new CustomType(2, 1l, "Hello")); + data.add(new CustomType(2, 2l, "Hello world")); + + Collections.shuffle(data); + + return env.fromCollection(data); + } + + public static class CustomType implements Serializable { + + private static final long serialVersionUID = 1L; + + public IntValue myInt; + public LongValue myLong; + public StringValue myString; + + public CustomType() { + } + + public CustomType(int i, long l, String s) { + myInt = new IntValue(i); + myLong = new LongValue(l); + myString = new StringValue(s); + } + + @Override + public String toString() { + return myInt + "," + myLong + "," + myString; + } + } + + public static class CustomTypeComparator implements Comparator<CustomType> { + + @Override + public int compare(CustomType o1, CustomType o2) { + int diff = o1.myInt.getValue() - o2.myInt.getValue(); + if (diff != 0) { + return diff; + } + diff = (int) (o1.myLong.getValue() - o2.myLong.getValue()); + return diff != 0 ? diff : o1.myString.getValue().compareTo(o2.myString.getValue()); + } + + } + + public static DataSet<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> getSmallTuplebasedDataSet(ExecutionEnvironment env) { + List<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> data = new ArrayList<>(); + + data.add(new Tuple7<>(new IntValue(1), new StringValue("First"), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new LongValue(10000L))); + data.add(new Tuple7<>(new IntValue(2), new StringValue("Second"), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new LongValue(20000L))); + data.add(new Tuple7<>(new IntValue(3), new StringValue("Third"), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new LongValue(30000L))); + + return env.fromCollection(data); + } + + public static DataSet<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) { + List<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> data = new ArrayList<>(); + + data.add(new Tuple7<>(new LongValue(10000L), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new IntValue(1), new StringValue("First"))); + data.add(new Tuple7<>(new LongValue(20000L), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new IntValue(2), new StringValue("Second"))); + data.add(new Tuple7<>(new LongValue(30000L), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new IntValue(3), new StringValue("Third"))); + + return env.fromCollection(data); + } + + public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) { + List<POJO> data = new ArrayList<POJO>(); + + data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/)); + data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)); + data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); + + return env.fromCollection(data); + } + + public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) { + List<POJO> data = new ArrayList<POJO>(); + + data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x + data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); + data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); + data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); + data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); + data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)); + data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x + data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); + + return env.fromCollection(data); + } + + public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) { + List<POJO> data = new ArrayList<POJO>(); + + data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x + data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L)); + data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L)); + data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L)); + data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L)); + data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L)); + data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x + data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L)); + + return env.fromCollection(data); + } + + public static class POJO { + public IntValue number; + public StringValue str; + public Tuple2<IntValue, CustomType> nestedTupleWithCustom; + public NestedPojo nestedPojo; + public transient LongValue ignoreMe; + + public POJO(int i0, String s0, + int i1, int i2, long l0, String s1, + long l1) { + this.number = new IntValue(i0); + this.str = new StringValue(s0); + this.nestedTupleWithCustom = new Tuple2<>(new IntValue(i1), new CustomType(i2, l0, s1)); + this.nestedPojo = new NestedPojo(); + this.nestedPojo.longNumber = new LongValue(l1); + } + + public POJO() { + } + + @Override + public String toString() { + return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber; + } + } + + public static class NestedPojo { + public static Object ignoreMe; + public LongValue longNumber; + + public NestedPojo() { + } + } + + public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) { + List<CrazyNested> data = new ArrayList<CrazyNested>(); + + data.add(new CrazyNested("aa")); + data.add(new CrazyNested("bb")); + data.add(new CrazyNested("bb")); + data.add(new CrazyNested("cc")); + data.add(new CrazyNested("cc")); + data.add(new CrazyNested("cc")); + + return env.fromCollection(data); + } + + public static class CrazyNested { + public CrazyNestedL1 nest_Lvl1; + public LongValue something; // test proper null-value handling + + public CrazyNested() { + } + + public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values + this(set); + something = new LongValue(s); + nest_Lvl1.a = new StringValue(second); + } + + public CrazyNested(String set) { + nest_Lvl1 = new CrazyNestedL1(); + nest_Lvl1.nest_Lvl2 = new CrazyNestedL2(); + nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3(); + nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4(); + nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = new StringValue(set); + } + } + + public static class CrazyNestedL1 { + public StringValue a; + public IntValue b; + public CrazyNestedL2 nest_Lvl2; + } + + public static class CrazyNestedL2 { + public CrazyNestedL3 nest_Lvl3; + } + + public static class CrazyNestedL3 { + public CrazyNestedL4 nest_Lvl4; + } + + public static class CrazyNestedL4 { + public StringValue f1nal; + } + + // Copied from TypeExtractorTest + public static class FromTuple extends Tuple3<StringValue, StringValue, LongValue> { + private static final long serialVersionUID = 1L; + public IntValue special; + } + + public static class FromTupleWithCTor extends FromTuple { + + private static final long serialVersionUID = 1L; + + public FromTupleWithCTor() {} + + public FromTupleWithCTor(int special, long tupleField) { + this.special = new IntValue(special); + this.setField(new LongValue(tupleField), 2); + } + } + + public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) { + List<FromTupleWithCTor> data = new ArrayList<>(); + data.add(new FromTupleWithCTor(1, 10L)); // 3x + data.add(new FromTupleWithCTor(1, 10L)); + data.add(new FromTupleWithCTor(1, 10L)); + data.add(new FromTupleWithCTor(2, 20L)); // 2x + data.add(new FromTupleWithCTor(2, 20L)); + return env.fromCollection(data); + } + + public static class PojoContainingTupleAndWritable { + public IntValue someInt; + public StringValue someString; + public IntWritable hadoopFan; + public Tuple2<LongValue, LongValue> theTuple; + + public PojoContainingTupleAndWritable() { + } + + public PojoContainingTupleAndWritable(int i, long l1, long l2) { + hadoopFan = new IntWritable(i); + someInt = new IntValue(i); + theTuple = new Tuple2<>(new LongValue(l1), new LongValue(l2)); + } + } + + public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) { + List<PojoContainingTupleAndWritable> data = new ArrayList<>(); + data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); + return env.fromCollection(data); + } + + + + public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) { + List<PojoContainingTupleAndWritable> data = new ArrayList<>(); + data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x + data.add(new PojoContainingTupleAndWritable(2, 20L, 201L)); + data.add(new PojoContainingTupleAndWritable(2, 30L, 200L)); + data.add(new PojoContainingTupleAndWritable(2, 30L, 600L)); + data.add(new PojoContainingTupleAndWritable(2, 30L, 400L)); + return env.fromCollection(data); + } + + public static DataSet<Tuple3<IntValue, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) { + List<Tuple3<IntValue, CrazyNested, POJO>> data = new ArrayList<>(); + data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x + data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); + data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); + // POJO is not initialized according to the first two fields. + data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(2), new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x + return env.fromCollection(data); + } + + public static class Pojo1 { + public StringValue a; + public StringValue b; + + public Pojo1() {} + + public Pojo1(String a, String b) { + this.a = new StringValue(a); + this.b = new StringValue(b); + } + } + + public static class Pojo2 { + public StringValue a2; + public StringValue b2; + } + + public static class PojoWithMultiplePojos { + public Pojo1 p1; + public Pojo2 p2; + public IntValue i0; + + public PojoWithMultiplePojos() { + } + + public PojoWithMultiplePojos(String a, String b, String a1, String b1, int i0) { + p1 = new Pojo1(); + p1.a = new StringValue(a); + p1.b = new StringValue(b); + p2 = new Pojo2(); + p2.a2 = new StringValue(a1); + p2.b2 = new StringValue(b1); + this.i0 = new IntValue(i0); + } + } + + public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) { + List<PojoWithMultiplePojos> data = new ArrayList<>(); + data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1)); + data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); + data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); + data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); + data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3)); + data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3)); + return env.fromCollection(data); + } + + public enum Category { + CAT_A, CAT_B; + } + + public static class PojoWithDateAndEnum { + public StringValue group; + public Date date; + public Category cat; + } + + public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) { + List<PojoWithDateAndEnum> data = new ArrayList<PojoWithDateAndEnum>(); + + PojoWithDateAndEnum one = new PojoWithDateAndEnum(); + one.group = new StringValue("a"); + one.date = new Date(666); + one.cat = Category.CAT_A; + data.add(one); + + PojoWithDateAndEnum two = new PojoWithDateAndEnum(); + two.group = new StringValue("a"); + two.date = new Date(666); + two.cat = Category.CAT_A; + data.add(two); + + PojoWithDateAndEnum three = new PojoWithDateAndEnum(); + three.group = new StringValue("b"); + three.date = new Date(666); + three.cat = Category.CAT_B; + data.add(three); + + return env.fromCollection(data); + } + + public static class PojoWithCollection { + public List<Pojo1> pojos; + public IntValue key; + public java.sql.Date sqlDate; + public BigInteger bigInt; + public BigDecimal bigDecimalKeepItNull; + public BigInt scalaBigInt; + public List<Object> mixed; + + @Override + public String toString() { + return "PojoWithCollection{" + + "pojos.size()=" + pojos.size() + + ", key=" + key + + ", sqlDate=" + sqlDate + + ", bigInt=" + bigInt + + ", bigDecimalKeepItNull=" + bigDecimalKeepItNull + + ", scalaBigInt=" + scalaBigInt + + ", mixed=" + mixed + + '}'; + } + } + + public static class PojoWithCollectionGeneric { + public List<Pojo1> pojos; + public IntValue key; + public java.sql.Date sqlDate; + public BigInteger bigInt; + public BigDecimal bigDecimalKeepItNull; + public BigInt scalaBigInt; + public List<Object> mixed; + private PojoWithDateAndEnum makeMeGeneric; + + @Override + public String toString() { + return "PojoWithCollection{" + + "pojos.size()=" + pojos.size() + + ", key=" + key + + ", sqlDate=" + sqlDate + + ", bigInt=" + bigInt + + ", bigDecimalKeepItNull=" + bigDecimalKeepItNull + + ", scalaBigInt=" + scalaBigInt + + ", mixed=" + mixed + + '}'; + } + } + + public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) { + List<PojoWithCollection> data = new ArrayList<>(); + + List<Pojo1> pojosList1 = new ArrayList<>(); + pojosList1.add(new Pojo1("a", "aa")); + pojosList1.add(new Pojo1("b", "bb")); + + List<Pojo1> pojosList2 = new ArrayList<>(); + pojosList2.add(new Pojo1("a2", "aa2")); + pojosList2.add(new Pojo1("b2", "bb2")); + + PojoWithCollection pwc1 = new PojoWithCollection(); + pwc1.pojos = pojosList1; + pwc1.key = new IntValue(0); + pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); + pwc1.scalaBigInt = BigInt.int2bigInt(10); + pwc1.bigDecimalKeepItNull = null; + + // use calendar to make it stable across time zones + GregorianCalendar gcl1 = new GregorianCalendar(2033, 04, 18); + pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis()); + pwc1.mixed = new ArrayList<Object>(); + Map<StringValue, IntValue> map = new HashMap<>(); + map.put(new StringValue("someKey"), new IntValue(1)); + pwc1.mixed.add(map); + pwc1.mixed.add(new File("/this/is/wrong")); + pwc1.mixed.add("uhlala"); + + PojoWithCollection pwc2 = new PojoWithCollection(); + pwc2.pojos = pojosList2; + pwc2.key = new IntValue(0); + pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); + pwc2.scalaBigInt = BigInt.int2bigInt(31104000); + pwc2.bigDecimalKeepItNull = null; + + GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3); + pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976 + + data.add(pwc1); + data.add(pwc2); + + return env.fromCollection(data); + } + +} +