Added expression keys to distinct and partition operator and addressed some of the comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/aca6fbcd Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/aca6fbcd Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/aca6fbcd Branch: refs/heads/master Commit: aca6fbcd6034cd3d700734f2c50a1084a9c22eb9 Parents: 926f835 Author: Robert Metzger <[email protected]> Authored: Sun Oct 5 19:30:18 2014 +0200 Committer: Robert Metzger <[email protected]> Committed: Wed Oct 8 11:39:01 2014 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 26 + .../flink/api/java/SerializationSpeedTest.java | 150 ------ .../api/java/operators/DistinctOperator.java | 5 +- .../api/java/operators/PartitionOperator.java | 5 +- .../flink/api/java/typeutils/TypeExtractor.java | 3 +- .../type/extractor/PojoTypeExtractionTest.java | 488 +++++++++++++++++++ .../java/type/extractor/TypeExtractorTest.java | 456 +---------------- .../runtime/FieldAccessMinibenchmark.java | 150 ++++++ .../typeutils/runtime/PojoComparatorTest.java | 64 +++ .../typeutils/runtime/PojoContainingTuple.java | 44 ++ .../runtime/PojoGenericTypeSerializerTest.java | 32 ++ .../test/javaApiOperators/DistinctITCase.java | 47 +- .../test/javaApiOperators/PartitionITCase.java | 41 +- .../util/CollectionDataSets.java | 13 + 14 files changed, 915 insertions(+), 609 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 424d30b..dd8c4ba 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -434,6 +434,20 @@ public abstract class DataSet<T> { } /** + * Returns a distinct set of a {@link Tuple} {@link DataSet} using expression keys. + * <p/> + * The field position keys specify the fields of Tuples or Pojos on which the decision is made if two elements are distinct or + * not. + * <p/> + * + * @param fields One or more field positions on which the distinction of the DataSet is decided. + * @return A DistinctOperator that represents the distinct DataSet. + */ + public DistinctOperator<T> distinct(String... fields) { + return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fields, getType())); + } + + /** * Returns a distinct set of a {@link Tuple} {@link DataSet} using all fields of the tuple. * <p/> * Note: This operator can only be applied to Tuple DataSets. @@ -867,6 +881,18 @@ public abstract class DataSet<T> { } /** + * Hash-partitions a DataSet on the specified key fields. + * <p> + * <b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time. + * + * @param fields The field expressions on which the DataSet is hash-partitioned. + * @return The partitioned DataSet. + */ + public PartitionOperator<T> partitionByHash(String... fields) { + return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<T>(fields, getType())); + } + + /** * Partitions a DataSet using the specified KeySelector. * <p> * <b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/SerializationSpeedTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/SerializationSpeedTest.java b/flink-java/src/main/java/org/apache/flink/api/java/SerializationSpeedTest.java deleted file mode 100644 index a81f55d..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/SerializationSpeedTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java; - -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; -import java.lang.reflect.Field; - -public class SerializationSpeedTest { - - static Field wordDescField; - static Field wordField; - static { - try { - wordDescField = WC.class.getField("wordDesc"); - wordField = ComplexWordDescriptor.class.getField("word"); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public static class ComplexWordDescriptor { - public String word; - - public String getWord() { - return word; - } - } - - public static class WC { - public int count; - public ComplexWordDescriptor wordDesc; - - public WC(int c, String s) throws NoSuchFieldException, - SecurityException { - this.count = c; - this.wordDesc = new ComplexWordDescriptor(); - this.wordDesc.word = s; - } - - public ComplexWordDescriptor getWordDesc() { - return wordDesc; - } - - } - - public static int compareCodeGenPublicFields(WC w1, WC w2) { - return w1.wordDesc.word.compareTo(w2.wordDesc.word); - } - - public static int compareCodeGenMethods(WC w1, WC w2) { - return w1.getWordDesc().getWord().compareTo(w2.getWordDesc().getWord()); - } - - public static int compareReflective(WC w1, WC w2) - throws IllegalArgumentException, IllegalAccessException { - // get String of w1 - Object wordDesc1 = wordDescField.get(w1); - String word2cmp1 = (String) wordField.get(wordDesc1); - - // get String of w2 - Object wordDesc2 = wordDescField.get(w2); - String word2cmp2 = (String) wordField.get(wordDesc2); - - return word2cmp1.compareTo(word2cmp2); - } - - /** - * results on Core i7 2600k - * - * - * warming up Code gen 5019 Reflection 20364 Factor = 4.057382 - */ - public static void main(String[] args) throws NoSuchFieldException, - SecurityException, IllegalArgumentException, IllegalAccessException { - final long RUNS = 1000000000L; - - final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); - String jvm = bean.getVmName() + " - " + bean.getVmVendor() + " - " - + bean.getSpecVersion() + '/' + bean.getVmVersion(); - System.err.println("Jvm info : " + jvm); - - WC word0 = new WC(14, "Hallo"); - WC word1 = new WC(3, "Hola"); - - System.err.println("warming up"); - for (long i = 0; i < 100000000; i++) { - compareCodeGenPublicFields(word0, word1); - compareCodeGenMethods(word0, word1); - compareReflective(word0, word1); - } - - System.err.println("Code gen public fields"); - long startTime = System.currentTimeMillis(); - for (long i = 0; i < RUNS; i++) { - int a = compareCodeGenPublicFields(word0, word1); - if (a == 0) { - System.err.println("hah"); - } - } - long stopTime = System.currentTimeMillis(); - long elapsedTimeGen = stopTime - startTime; - System.err.println(elapsedTimeGen); - - System.err.println("Code gen methods"); - startTime = System.currentTimeMillis(); - for (long i = 0; i < RUNS; i++) { - int a = compareCodeGenPublicFields(word0, word1); - if (a == 0) { - System.err.println("hah"); - } - } - stopTime = System.currentTimeMillis(); - long elapsedTimeGenMethods = stopTime - startTime; - System.err.println(elapsedTimeGenMethods); - - System.err.println("Reflection"); - - startTime = System.currentTimeMillis(); - for (long i = 0; i < RUNS; i++) { - int a = compareReflective(word0, word1); - if (a == 0) { - System.err.println("hah"); - } - } - stopTime = System.currentTimeMillis(); - long elapsedTimeRef = stopTime - startTime; - System.err.println(elapsedTimeRef); - - System.err.println("Factor vs public = " - + (elapsedTimeRef / (float) elapsedTimeGen)); - System.err.println("Factor vs methods = " - + (elapsedTimeRef / (float) elapsedTimeGenMethods)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index 54a65a9..18fd756 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; @@ -67,8 +68,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera // FieldPositionKeys can only be applied on Tuples - if (keys instanceof Keys.ExpressionKeys && !input.getType().isTupleType()) { - throw new InvalidProgramException("Distinction on field positions is only possible on tuple data types."); + if (keys instanceof Keys.ExpressionKeys && !(input.getType() instanceof CompositeType)) { + throw new InvalidProgramException("Distinction on field positions is only possible on composite type DataSets."); } this.keys = keys; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java index 532e464..c4548fb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.base.PartitionOperatorBase; import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.KeyRemovingMapper; @@ -50,8 +51,8 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition throw new UnsupportedOperationException("Range Partitioning not yet supported"); } - if(pKeys instanceof Keys.ExpressionKeys<?> && !input.getType().isTupleType()) { - throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple DataSets"); + if(pKeys instanceof Keys.ExpressionKeys<?> && !(input.getType() instanceof CompositeType) ) { + throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Composite-type DataSets"); } this.pMethod = pMethod; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index bc92cdd..6231a74 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -399,8 +399,7 @@ public class TypeExtractor { } // objects with generics are treated as raw type else if (t instanceof ParameterizedType) { - return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), new ArrayList<Type>()); // pass new type hierarchies here because - // while creating the TH here, we assumed a tuple type. + return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), typeHierarchy); } // no tuple, no TypeVariable, no generic type else if (t instanceof Class) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java new file mode 100644 index 0000000..34dbd99 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.type.extractor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable; +import org.apache.flink.api.java.typeutils.WritableTypeInfo; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.HashMultiset; + +/** + * Pojo Type tests + * + * A Pojo is a bean-style class with getters, setters and empty ctor + * OR a class with all fields public (or for every private field, there has to be a public getter/setter) + * everything else is a generic type (that can't be used for field selection) + */ +public class PojoTypeExtractionTest { + + + // test with correct pojo types + public static class WC { // is a pojo + public ComplexNestedClass complex; // is a pojo + private int count; // is a BasicType + + public WC() { + } + public int getCount() { + return count; + } + public void setCount(int c) { + this.count = c; + } + } + public static class ComplexNestedClass { // pojo + public static int ignoreStaticField; + public transient int ignoreTransientField; + public Date date; // generic type + public Integer someNumber; // BasicType + public float someFloat; // BasicType + public Tuple3<Long, Long, String> word; //Tuple Type with three basic types + public Object nothing; // generic type + public MyWritable hadoopCitizen; // writableType + } + + // all public test + public static class AllPublic extends ComplexNestedClass { + public ArrayList<String> somethingFancy; // generic type + public HashMultiset<Integer> fancyIds; // generic type + public String[] fancyArray; // generic type + } + + public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> { + public String field3; + } + public static class PojoWithGenerics<T1, T2> { + public int key; + public T1 field1; + public T2 field2; + } + + public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {} + public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {} + + // extends from Tuple and adds a field + public static class FromTuple extends Tuple3<String, String, Long> { + private static final long serialVersionUID = 1L; + public int special; + } + + public static class IncorrectPojo { + private int isPrivate; + public int getIsPrivate() { + return isPrivate; + } + // setter is missing (intentional) + } + + // correct pojo + public static class BeanStylePojo { + public String abc; + private int field; + public int getField() { + return this.field; + } + public void setField(int f) { + this.field = f; + } + } + public static class WrongCtorPojo { + public int a; + public WrongCtorPojo(int a) { + this.a = a; + } + } + + // in this test, the location of the getters and setters is mixed across the type hierarchy. + public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> { + public void setPackageProtected(String in) { + this.packageProtected = in; + } + } + public static class GenericPojoGetterSetterCheck<T> { + T packageProtected; + public T getPackageProtected() { + return packageProtected; + } + } + + @Test + public void testIncorrectPojos() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class); + Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>); + + typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class); + Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>); + } + + @Test + public void testCorrectPojos() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + + typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + } + + @Test + public void testPojoWC() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class); + checkWCPojoAsserts(typeForClass); + + WC t = new WC(); + t.complex = new ComplexNestedClass(); + TypeInformation<?> typeForObject = TypeExtractor.getForObject(t); + checkWCPojoAsserts(typeForObject); + } + + private void checkWCPojoAsserts(TypeInformation<?> typeInfo) { + Assert.assertFalse(typeInfo.isBasicType()); + Assert.assertFalse(typeInfo.isTupleType()); + Assert.assertEquals(9, typeInfo.getTotalFields()); + Assert.assertTrue(typeInfo instanceof PojoTypeInfo); + PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo; + + List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); + String[] fields = {"count","complex.date", "complex.hadoopCitizen", "complex.nothing", + "complex.someFloat", "complex.someNumber", "complex.word.f0", + "complex.word.f1", "complex.word.f2"}; + int[] positions = {8,0,1,2, + 3,4,5, + 6,7}; + Assert.assertEquals(fields.length, positions.length); + for(int i = 0; i < fields.length; i++) { + pojoType.getKey(fields[i], 0, ffd); + Assert.assertEquals("Too many keys returned", 1, ffd.size()); + Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition()); + ffd.clear(); + } + + pojoType.getKey("complex.word.*", 0, ffd); + Assert.assertEquals(3, ffd.size()); + // check if it returns 5,6,7 + for(FlatFieldDescriptor ffdE : ffd) { + final int pos = ffdE.getPosition(); + Assert.assertTrue(pos <= 7 ); + Assert.assertTrue(5 <= pos ); + if(pos == 5) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 6) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 7) { + Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); + } + } + ffd.clear(); + + + pojoType.getKey("complex.*", 0, ffd); + Assert.assertEquals(8, ffd.size()); + // check if it returns 0-7 + for(FlatFieldDescriptor ffdE : ffd) { + final int pos = ffdE.getPosition(); + Assert.assertTrue(ffdE.getPosition() <= 7 ); + Assert.assertTrue(0 <= ffdE.getPosition() ); + if(pos == 0) { + Assert.assertEquals(Date.class, ffdE.getType().getTypeClass()); + } + if(pos == 1) { + Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass()); + } + if(pos == 2) { + Assert.assertEquals(Object.class, ffdE.getType().getTypeClass()); + } + if(pos == 3) { + Assert.assertEquals(Float.class, ffdE.getType().getTypeClass()); + } + if(pos == 4) { + Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); + } + if(pos == 5) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 6) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 7) { + Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); + } + } + ffd.clear(); + + pojoType.getKey("*", 0, ffd); + Assert.assertEquals(9, ffd.size()); + // check if it returns 0-8 + for(FlatFieldDescriptor ffdE : ffd) { + Assert.assertTrue(ffdE.getPosition() <= 8 ); + Assert.assertTrue(0 <= ffdE.getPosition() ); + if(ffdE.getPosition() == 8) { + Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); + } + } + ffd.clear(); + + TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex + Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo); + + Assert.assertEquals(6, typeComplexNested.getArity()); + Assert.assertEquals(8, typeComplexNested.getTotalFields()); + PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested; + + boolean dateSeen = false, intSeen = false, floatSeen = false, + tupleSeen = false, objectSeen = false, writableSeen = false; + for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) { + PojoField field = pojoTypeComplexNested.getPojoFieldAt(i); + String name = field.field.getName(); + if(name.equals("date")) { + if(dateSeen) { + Assert.fail("already seen"); + } + dateSeen = true; + Assert.assertEquals(new GenericTypeInfo<Date>(Date.class), field.type); + Assert.assertEquals(Date.class, field.type.getTypeClass()); + } else if(name.equals("someNumber")) { + if(intSeen) { + Assert.fail("already seen"); + } + intSeen = true; + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + Assert.assertEquals(Integer.class, field.type.getTypeClass()); + } else if(name.equals("someFloat")) { + if(floatSeen) { + Assert.fail("already seen"); + } + floatSeen = true; + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type); + Assert.assertEquals(Float.class, field.type.getTypeClass()); + } else if(name.equals("word")) { + if(tupleSeen) { + Assert.fail("already seen"); + } + tupleSeen = true; + Assert.assertTrue(field.type instanceof TupleTypeInfo<?>); + Assert.assertEquals(Tuple3.class, field.type.getTypeClass()); + // do some more advanced checks on the tuple + TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.type; + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2)); + } else if(name.equals("nothing")) { + if(objectSeen) { + Assert.fail("already seen"); + } + objectSeen = true; + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); + Assert.assertEquals(Object.class, field.type.getTypeClass()); + } else if(name.equals("hadoopCitizen")) { + if(writableSeen) { + Assert.fail("already seen"); + } + writableSeen = true; + Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type); + Assert.assertEquals(MyWritable.class, field.type.getTypeClass()); + } else { + Assert.fail("field "+field+" is not expected"); + } + } + Assert.assertTrue("Field was not present", dateSeen); + Assert.assertTrue("Field was not present", intSeen); + Assert.assertTrue("Field was not present", floatSeen); + Assert.assertTrue("Field was not present", tupleSeen); + Assert.assertTrue("Field was not present", objectSeen); + Assert.assertTrue("Field was not present", writableSeen); + + TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count + Assert.assertTrue(typeAtOne instanceof BasicTypeInfo); + + Assert.assertEquals(typeInfo.getTypeClass(), WC.class); + Assert.assertEquals(typeInfo.getArity(), 2); + } + + @Test + public void testPojoAllPublic() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class); + checkAllPublicAsserts(typeForClass); + + TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() ); + checkAllPublicAsserts(typeForObject); + } + + private void checkAllPublicAsserts(TypeInformation<?> typeInformation) { + Assert.assertTrue(typeInformation instanceof PojoTypeInfo); + Assert.assertEquals(9, typeInformation.getArity()); + Assert.assertEquals(11, typeInformation.getTotalFields()); + // check if the three additional fields are identified correctly + boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false; + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.field.getName(); + if(name.equals("somethingFancy")) { + if(arrayListSeen) { + Assert.fail("already seen"); + } + arrayListSeen = true; + Assert.assertTrue(field.type instanceof GenericTypeInfo); + Assert.assertEquals(ArrayList.class, field.type.getTypeClass()); + } else if(name.equals("fancyIds")) { + if(multisetSeen) { + Assert.fail("already seen"); + } + multisetSeen = true; + Assert.assertTrue(field.type instanceof GenericTypeInfo); + Assert.assertEquals(HashMultiset.class, field.type.getTypeClass()); + } else if(name.equals("fancyArray")) { + if(strArraySeen) { + Assert.fail("already seen"); + } + strArraySeen = true; + Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type); + Assert.assertEquals(String[].class, field.type.getTypeClass()); + } else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen").contains(name)) { + // ignore these, they are inherited from the ComplexNestedClass + } + else { + Assert.fail("field "+field+" is not expected"); + } + } + Assert.assertTrue("Field was not present", arrayListSeen); + Assert.assertTrue("Field was not present", multisetSeen); + Assert.assertTrue("Field was not present", strArraySeen); + } + + @Test + public void testPojoExtendingTuple() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class); + checkFromTuplePojo(typeForClass); + + FromTuple ft = new FromTuple(); + ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L; + TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft); + checkFromTuplePojo(typeForObject); + } + + private void checkFromTuplePojo(TypeInformation<?> typeInformation) { + Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>); + Assert.assertEquals(4, typeInformation.getTotalFields()); + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.field.getName(); + if(name.equals("special")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + } else if(name.equals("f0") || name.equals("f1")) { + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type); + } else if(name.equals("f2")) { + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type); + } else { + Assert.fail("unexpected field"); + } + } + } + + @Test + public void testPojoWithGenerics() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.field.getName(); + if(name.equals("field1")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + } else if (name.equals("field2")) { + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type); + } else if (name.equals("field3")) { + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type); + } else if (name.equals("key")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + } else { + Assert.fail("Unexpected field "+field); + } + } + } + + /** + * Test if the TypeExtractor is accepting untyped generics, + * making them GenericTypes + */ + @Test + @Ignore // kryo needed. + public void testPojoWithGenericsSomeFieldsGeneric() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.field.getName(); + if(name.equals("field1")) { + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); + } else if (name.equals("field2")) { + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); + } else if (name.equals("key")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + } else { + Assert.fail("Unexpected field "+field); + } + } + } + + + @Test + public void testPojoWithComplexHierarchy() { + TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class); + Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); + PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; + for(int i = 0; i < pojoTypeForClass.getArity(); i++) { + PojoField field = pojoTypeForClass.getPojoFieldAt(i); + String name = field.field.getName(); + if(name.equals("field1")) { + Assert.assertTrue(field.type instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!) + } else if (name.equals("field2")) { + Assert.assertTrue(field.type instanceof TupleTypeInfo<?>); + Assert.assertTrue( ((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) ); + } else if (name.equals("key")) { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + } else { + Assert.fail("Unexpected field "+field); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index 60d41d3..a092a53 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -22,8 +22,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; import java.util.List; import org.apache.flink.api.common.functions.InvalidTypesException; @@ -46,9 +44,7 @@ import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple9; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.PojoField; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -67,7 +63,6 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import com.google.common.collect.HashMultiset; public class TypeExtractorTest { @@ -350,449 +345,7 @@ public class TypeExtractorTest { Assert.assertEquals(ti2.getTypeClass(), CustomType.class); } - // - // Pojo Type tests - // A Pojo is a bean-style class with getters, setters and empty ctor - // OR a class with all fields public (or for every private field, there has to be a public getter/setter) - // everything else is a generic type (that can't be used for field selection) - // - - - // test with correct pojo types - public static class WC { // is a pojo - public ComplexNestedClass complex; // is a pojo - private int count; // is a BasicType - - public WC() { - } - public int getCount() { - return count; - } - public void setCount(int c) { - this.count = c; - } - } - public static class ComplexNestedClass { // pojo - public static int ignoreStaticField; - public transient int ignoreTransientField; - public Date date; // generic type - public Integer someNumber; // BasicType - public float someFloat; // BasicType - public Tuple3<Long, Long, String> word; //Tuple Type with three basic types - public Object nothing; // generic type - public MyWritable hadoopCitizen; // writableType - } - // all public test - public static class AllPublic extends ComplexNestedClass { - public ArrayList<String> somethingFancy; // generic type - public HashMultiset<Integer> fancyIds; // generic type - public String[] fancyArray; // generic type - } - - public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> { - public String field3; - } - public static class PojoWithGenerics<T1, T2> { - public int key; - public T1 field1; - public T2 field2; - } - - public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {} - public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {} - - // extends from Tuple and adds a field - public static class FromTuple extends Tuple3<String, String, Long> { - private static final long serialVersionUID = 1L; - public int special; - } - - public static class IncorrectPojo { - private int isPrivate; - public int getIsPrivate() { - return isPrivate; - } - // setter is missing (intentional) - } - - // correct pojo - public static class BeanStylePojo { - public String abc; - private int field; - public int getField() { - return this.field; - } - public void setField(int f) { - this.field = f; - } - } - public static class WrongCtorPojo { - public int a; - public WrongCtorPojo(int a) { - this.a = a; - } - } - - // in this test, the location of the getters and setters is mixed across the type hierarchy. - public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> { - public void setPackageProtected(String in) { - this.packageProtected = in; - } - } - public static class GenericPojoGetterSetterCheck<T> { - T packageProtected; - public T getPackageProtected() { - return packageProtected; - } - } - - @Test - public void testIncorrectPojos() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class); - Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>); - - typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class); - Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>); - } - - @Test - public void testCorrectPojos() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - - typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - } - - @Test - public void testPojoWC() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class); - checkWCPojoAsserts(typeForClass); - - WC t = new WC(); - t.complex = new ComplexNestedClass(); - TypeInformation<?> typeForObject = TypeExtractor.getForObject(t); - checkWCPojoAsserts(typeForObject); - } - - private void checkWCPojoAsserts(TypeInformation<?> typeInfo) { - Assert.assertFalse(typeInfo.isBasicType()); - Assert.assertFalse(typeInfo.isTupleType()); - Assert.assertEquals(9, typeInfo.getTotalFields()); - Assert.assertTrue(typeInfo instanceof PojoTypeInfo); - PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo; - - List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); - String[] fields = {"count","complex.date", "complex.hadoopCitizen", "complex.nothing", - "complex.someFloat", "complex.someNumber", "complex.word.f0", - "complex.word.f1", "complex.word.f2"}; - int[] positions = {8,0,1,2, - 3,4,5, - 6,7}; - Assert.assertEquals(fields.length, positions.length); - for(int i = 0; i < fields.length; i++) { - pojoType.getKey(fields[i], 0, ffd); - Assert.assertEquals("Too many keys returned", 1, ffd.size()); - Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition()); - ffd.clear(); - } - - pojoType.getKey("complex.word.*", 0, ffd); - Assert.assertEquals(3, ffd.size()); - // check if it returns 5,6,7 - for(FlatFieldDescriptor ffdE : ffd) { - final int pos = ffdE.getPosition(); - Assert.assertTrue(pos <= 7 ); - Assert.assertTrue(5 <= pos ); - if(pos == 5) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } - if(pos == 6) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } - if(pos == 7) { - Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); - } - } - ffd.clear(); - - - pojoType.getKey("complex.*", 0, ffd); - Assert.assertEquals(8, ffd.size()); - // check if it returns 0-7 - for(FlatFieldDescriptor ffdE : ffd) { - final int pos = ffdE.getPosition(); - Assert.assertTrue(ffdE.getPosition() <= 7 ); - Assert.assertTrue(0 <= ffdE.getPosition() ); - if(pos == 0) { - Assert.assertEquals(Date.class, ffdE.getType().getTypeClass()); - } - if(pos == 1) { - Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass()); - } - if(pos == 2) { - Assert.assertEquals(Object.class, ffdE.getType().getTypeClass()); - } - if(pos == 3) { - Assert.assertEquals(Float.class, ffdE.getType().getTypeClass()); - } - if(pos == 4) { - Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); - } - if(pos == 5) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } - if(pos == 6) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } - if(pos == 7) { - Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); - } - } - ffd.clear(); - - pojoType.getKey("*", 0, ffd); - Assert.assertEquals(9, ffd.size()); - // check if it returns 0-8 - for(FlatFieldDescriptor ffdE : ffd) { - Assert.assertTrue(ffdE.getPosition() <= 8 ); - Assert.assertTrue(0 <= ffdE.getPosition() ); - if(ffdE.getPosition() == 8) { - Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); - } - } - ffd.clear(); - - TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex - Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo); - - Assert.assertEquals(6, typeComplexNested.getArity()); - Assert.assertEquals(8, typeComplexNested.getTotalFields()); - PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested; - - boolean dateSeen = false, intSeen = false, floatSeen = false, - tupleSeen = false, objectSeen = false, writableSeen = false; - for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) { - PojoField field = pojoTypeComplexNested.getPojoFieldAt(i); - String name = field.field.getName(); - if(name.equals("date")) { - if(dateSeen) { - Assert.fail("already seen"); - } - dateSeen = true; - Assert.assertEquals(new GenericTypeInfo<Date>(Date.class), field.type); - Assert.assertEquals(Date.class, field.type.getTypeClass()); - } else if(name.equals("someNumber")) { - if(intSeen) { - Assert.fail("already seen"); - } - intSeen = true; - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); - Assert.assertEquals(Integer.class, field.type.getTypeClass()); - } else if(name.equals("someFloat")) { - if(floatSeen) { - Assert.fail("already seen"); - } - floatSeen = true; - Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type); - Assert.assertEquals(Float.class, field.type.getTypeClass()); - } else if(name.equals("word")) { - if(tupleSeen) { - Assert.fail("already seen"); - } - tupleSeen = true; - Assert.assertTrue(field.type instanceof TupleTypeInfo<?>); - Assert.assertEquals(Tuple3.class, field.type.getTypeClass()); - // do some more advanced checks on the tuple - TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.type; - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0)); - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1)); - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2)); - } else if(name.equals("nothing")) { - if(objectSeen) { - Assert.fail("already seen"); - } - objectSeen = true; - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); - Assert.assertEquals(Object.class, field.type.getTypeClass()); - } else if(name.equals("hadoopCitizen")) { - if(writableSeen) { - Assert.fail("already seen"); - } - writableSeen = true; - Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type); - Assert.assertEquals(MyWritable.class, field.type.getTypeClass()); - } else { - Assert.fail("field "+field+" is not expected"); - } - } - Assert.assertTrue("Field was not present", dateSeen); - Assert.assertTrue("Field was not present", intSeen); - Assert.assertTrue("Field was not present", floatSeen); - Assert.assertTrue("Field was not present", tupleSeen); - Assert.assertTrue("Field was not present", objectSeen); - Assert.assertTrue("Field was not present", writableSeen); - - TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count - Assert.assertTrue(typeAtOne instanceof BasicTypeInfo); - - Assert.assertEquals(typeInfo.getTypeClass(), WC.class); - Assert.assertEquals(typeInfo.getArity(), 2); - } - - @Test - public void testPojoAllPublic() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class); - checkAllPublicAsserts(typeForClass); - - TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() ); - checkAllPublicAsserts(typeForObject); - } - - private void checkAllPublicAsserts(TypeInformation<?> typeInformation) { - Assert.assertTrue(typeInformation instanceof PojoTypeInfo); - Assert.assertEquals(9, typeInformation.getArity()); - Assert.assertEquals(11, typeInformation.getTotalFields()); - // check if the three additional fields are identified correctly - boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false; - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); - if(name.equals("somethingFancy")) { - if(arrayListSeen) { - Assert.fail("already seen"); - } - arrayListSeen = true; - Assert.assertTrue(field.type instanceof GenericTypeInfo); - Assert.assertEquals(ArrayList.class, field.type.getTypeClass()); - } else if(name.equals("fancyIds")) { - if(multisetSeen) { - Assert.fail("already seen"); - } - multisetSeen = true; - Assert.assertTrue(field.type instanceof GenericTypeInfo); - Assert.assertEquals(HashMultiset.class, field.type.getTypeClass()); - } else if(name.equals("fancyArray")) { - if(strArraySeen) { - Assert.fail("already seen"); - } - strArraySeen = true; - Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type); - Assert.assertEquals(String[].class, field.type.getTypeClass()); - } else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen").contains(name)) { - // ignore these, they are inherited from the ComplexNestedClass - } - else { - Assert.fail("field "+field+" is not expected"); - } - } - Assert.assertTrue("Field was not present", arrayListSeen); - Assert.assertTrue("Field was not present", multisetSeen); - Assert.assertTrue("Field was not present", strArraySeen); - } - - @Test - public void testPojoExtendingTuple() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class); - checkFromTuplePojo(typeForClass); - - FromTuple ft = new FromTuple(); - ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L; - TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft); - checkFromTuplePojo(typeForObject); - } - - private void checkFromTuplePojo(TypeInformation<?> typeInformation) { - Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>); - Assert.assertEquals(4, typeInformation.getTotalFields()); - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); - if(name.equals("special")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); - } else if(name.equals("f0") || name.equals("f1")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type); - } else if(name.equals("f2")) { - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type); - } else { - Assert.fail("unexpected field"); - } - } - } - - @Test - public void testPojoWithGenerics() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); - if(name.equals("field1")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); - } else if (name.equals("field2")) { - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type); - } else if (name.equals("field3")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type); - } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); - } else { - Assert.fail("Unexpected field "+field); - } - } - } - - /** - * Test if the TypeExtractor is accepting untyped generics, - * making them GenericTypes - */ - @Test - @Ignore // kryo needed. - public void testPojoWithGenericsSomeFieldsGeneric() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); - if(name.equals("field1")) { - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); - } else if (name.equals("field2")) { - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); - } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); - } else { - Assert.fail("Unexpected field "+field); - } - } - } - - - @Test - public void testPojoWithComplexHierarchy() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); - if(name.equals("field1")) { - Assert.assertTrue(field.type instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!) - } else if (name.equals("field2")) { - Assert.assertTrue(field.type instanceof TupleTypeInfo<?>); - Assert.assertTrue( ((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) ); - } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); - } else { - Assert.fail("Unexpected field "+field); - } - } - } - - // End of Pojo type tests public static class CustomType { public String myField1; @@ -1691,20 +1244,21 @@ public class TypeExtractorTest { public T myField; } + public static class InType extends MyObject<String> {} @SuppressWarnings({ "rawtypes", "unchecked" }) @Test @Ignore public void testParamertizedCustomObject() { - RichMapFunction<?, ?> function = new RichMapFunction<MyObject<String>, MyObject<String>>() { + RichMapFunction<?, ?> function = new RichMapFunction<InType, MyObject<String>>() { private static final long serialVersionUID = 1L; @Override - public MyObject<String> map(MyObject<String> value) throws Exception { + public MyObject<String> map(InType value) throws Exception { return null; } }; - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.TypeExtractorTest$MyObject")); + TypeInformation<?> inType = TypeExtractor.createTypeInfo(InType.class); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) inType); Assert.assertTrue(ti instanceof PojoTypeInfo); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/FieldAccessMinibenchmark.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/FieldAccessMinibenchmark.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/FieldAccessMinibenchmark.java new file mode 100644 index 0000000..8a0ec82 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/FieldAccessMinibenchmark.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.lang.reflect.Field; + +public class FieldAccessMinibenchmark { + + static Field wordDescField; + static Field wordField; + static { + try { + wordDescField = WC.class.getField("wordDesc"); + wordField = ComplexWordDescriptor.class.getField("word"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static class ComplexWordDescriptor { + public String word; + + public String getWord() { + return word; + } + } + + public static class WC { + public int count; + public ComplexWordDescriptor wordDesc; + + public WC(int c, String s) throws NoSuchFieldException, + SecurityException { + this.count = c; + this.wordDesc = new ComplexWordDescriptor(); + this.wordDesc.word = s; + } + + public ComplexWordDescriptor getWordDesc() { + return wordDesc; + } + + } + + public static int compareCodeGenPublicFields(WC w1, WC w2) { + return w1.wordDesc.word.compareTo(w2.wordDesc.word); + } + + public static int compareCodeGenMethods(WC w1, WC w2) { + return w1.getWordDesc().getWord().compareTo(w2.getWordDesc().getWord()); + } + + public static int compareReflective(WC w1, WC w2) + throws IllegalArgumentException, IllegalAccessException { + // get String of w1 + Object wordDesc1 = wordDescField.get(w1); + String word2cmp1 = (String) wordField.get(wordDesc1); + + // get String of w2 + Object wordDesc2 = wordDescField.get(w2); + String word2cmp2 = (String) wordField.get(wordDesc2); + + return word2cmp1.compareTo(word2cmp2); + } + + /** + * results on Core i7 2600k + * + * + * warming up Code gen 5019 Reflection 20364 Factor = 4.057382 + */ + public static void main(String[] args) throws NoSuchFieldException, + SecurityException, IllegalArgumentException, IllegalAccessException { + final long RUNS = 1000000000L; + + final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); + String jvm = bean.getVmName() + " - " + bean.getVmVendor() + " - " + + bean.getSpecVersion() + '/' + bean.getVmVersion(); + System.err.println("Jvm info : " + jvm); + + WC word0 = new WC(14, "Hallo"); + WC word1 = new WC(3, "Hola"); + + System.err.println("warming up"); + for (long i = 0; i < 100000000; i++) { + compareCodeGenPublicFields(word0, word1); + compareCodeGenMethods(word0, word1); + compareReflective(word0, word1); + } + + System.err.println("Code gen public fields"); + long startTime = System.currentTimeMillis(); + for (long i = 0; i < RUNS; i++) { + int a = compareCodeGenPublicFields(word0, word1); + if (a == 0) { + System.err.println("hah"); + } + } + long stopTime = System.currentTimeMillis(); + long elapsedTimeGen = stopTime - startTime; + System.err.println(elapsedTimeGen); + + System.err.println("Code gen methods"); + startTime = System.currentTimeMillis(); + for (long i = 0; i < RUNS; i++) { + int a = compareCodeGenPublicFields(word0, word1); + if (a == 0) { + System.err.println("hah"); + } + } + stopTime = System.currentTimeMillis(); + long elapsedTimeGenMethods = stopTime - startTime; + System.err.println(elapsedTimeGenMethods); + + System.err.println("Reflection"); + + startTime = System.currentTimeMillis(); + for (long i = 0; i < RUNS; i++) { + int a = compareReflective(word0, word1); + if (a == 0) { + System.err.println("hah"); + } + } + stopTime = System.currentTimeMillis(); + long elapsedTimeRef = stopTime - startTime; + System.err.println(elapsedTimeRef); + + System.err.println("Factor vs public = " + + (elapsedTimeRef / (float) elapsedTimeGen)); + System.err.println("Factor vs methods = " + + (elapsedTimeRef / (float) elapsedTimeGenMethods)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java new file mode 100644 index 0000000..5cb31ca --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.util.Arrays; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.Keys.ExpressionKeys; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.junit.Assert; +import org.junit.Ignore; + + +@Ignore // TODO +public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple> { + TypeInformation<PojoContainingTuple> type = TypeExtractor.getForClass(PojoContainingTuple.class); + + PojoContainingTuple[] data = new PojoContainingTuple[]{ + new PojoContainingTuple(1, 1L, 1L), + new PojoContainingTuple(2, 2L, 2L), + new PojoContainingTuple(8519, 85190L, 85190L), + new PojoContainingTuple(-51498, 85191L, 85191L), + }; + + @Override + protected TypeComparator<PojoContainingTuple> createComparator(boolean ascending) { + Assert.assertTrue(type instanceof CompositeType); + CompositeType<PojoContainingTuple> cType = (CompositeType<PojoContainingTuple>) type; + ExpressionKeys<PojoContainingTuple> keys = new ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType); + boolean[] orders = new boolean[keys.getNumberOfKeyFields()]; + Arrays.fill(orders, true); + return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0); + } + + @Override + protected TypeSerializer<PojoContainingTuple> createSerializer() { + return type.createSerializer(); + } + + @Override + protected PojoContainingTuple[] getSortedTestData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java new file mode 100644 index 0000000..ca17ee0 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * This file belongs to the PojoComparatorTest test + * + */ +public class PojoContainingTuple { + public int someInt; + public String someString = "abc"; + public Tuple2<Long, Long> theTuple; + public PojoContainingTuple() {} + public PojoContainingTuple(int i, long l1, long l2) { + someInt = i; + theTuple = new Tuple2<Long, Long>(l1, l2); + } + + @Override + public boolean equals(Object obj) { + if(obj instanceof PojoContainingTuple) { + PojoContainingTuple other = (PojoContainingTuple) obj; + return someInt == other.someInt && theTuple.equals(other.theTuple); + } + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java new file mode 100644 index 0000000..a176178 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +public class PojoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { + + @Override + protected <T> TypeSerializer<T> createSerializer(Class<T> type) { + TypeInformation<T> typeInfo = TypeExtractor.getForClass(type); + return typeInfo.createSerializer(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java index 69bab75..54f1b58 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; @@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; import org.apache.flink.test.util.JavaProgramTestBase; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -42,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class DistinctITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 6; + private static int NUM_PROGRAMS = 8; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -237,6 +239,49 @@ public class DistinctITCase extends JavaProgramTestBase { "5,2\n" + "5,3\n"; } + case 7: { + + /* + * check correctness of distinct on tuples with field expressions + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple1<Integer>> reduceDs = ds.union(ds) + .distinct("f0").project(0).types(Integer.class); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1\n" + + "2\n"; + + } + case 8: { + + /* + * check correctness of distinct on Pojos + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env); + DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new MapFunction<CollectionDataSets.POJO, Integer>() { + @Override + public Integer map(POJO value) throws Exception { + return (int) value.nestedPojo.longNumber; + } + }); + + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "10000\n20000\n30000\n"; + + } default: throw new IllegalArgumentException("Invalid program id"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java index bf1d404..d3c87fa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; import org.junit.runner.RunWith; @@ -45,7 +46,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class PartitionITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 3; + private static int NUM_PROGRAMS = 4; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -224,6 +225,29 @@ public class PartitionITCase extends JavaProgramTestBase { "5\n" + "6\n"; } + case 4: { + /* + * Test hash partition with key expression + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(3); + + DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env); + DataSet<Long> uniqLongs = ds + .partitionByHash("nestedPojo.longNumber").setParallelism(4) + .mapPartition(new UniqueNestedPojoLongMapper()); + uniqLongs.writeAsText(resultPath); + + env.execute(); + + // return expected result + return "10000\n" + + "20000\n" + + "30000\n"; + } + + default: throw new IllegalArgumentException("Invalid program id"); @@ -246,6 +270,21 @@ public class PartitionITCase extends JavaProgramTestBase { } } + public static class UniqueNestedPojoLongMapper implements MapPartitionFunction<POJO, Long> { + private static final long serialVersionUID = 1L; + + @Override + public void mapPartition(Iterable<POJO> records, Collector<Long> out) throws Exception { + HashSet<Long> uniq = new HashSet<Long>(); + for(POJO t : records) { + uniq.add(t.nestedPojo.longNumber); + } + for(Long l : uniq) { + out.collect(l); + } + } + } + public static class PartitionIndexMapper extends RichMapFunction<Long, Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aca6fbcd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index b657545..d023287 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -278,6 +278,19 @@ public class CollectionDataSets { return env.fromCollection(data); } + public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) { + List<POJO> data = new ArrayList<POJO>(); + data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); // 5x + data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); + data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); + data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); + data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); + data.add(new POJO(2, "Second",20, 200, 2000L, "Two", 20000L)); + data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L)); // 2x + data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L)); + return env.fromCollection(data); + } + public static class POJO { public int number; public String str;
