Repository: flink Updated Branches: refs/heads/release-0.9 80d3478c0 -> ec3b98327
[FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec3b9832 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec3b9832 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec3b9832 Branch: refs/heads/release-0.9 Commit: ec3b983276e85e0baed98ffa8acf6709f20032de Parents: 80d3478 Author: twalthr <twal...@apache.org> Authored: Tue Aug 4 15:30:28 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Aug 5 16:25:32 2015 +0200 ---------------------------------------------------------------------- .../flink/api/java/typeutils/TypeExtractor.java | 53 ++++++++++++---- .../type/extractor/PojoTypeExtractionTest.java | 63 ++++++++++++++++++++ 2 files changed, 104 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ec3b9832/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 41644f9..1ae8d3d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -26,9 +26,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.avro.specific.SpecificRecordBase; import org.apache.flink.api.common.functions.CoGroupFunction; @@ -66,15 +64,33 @@ import com.google.common.base.Preconditions; * functions. */ public class TypeExtractor { + + /* + * NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy". + * The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate + * types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type + * (depends on the method, e.g. MyPojoFieldType). + * + * Thus, it fully qualifies types until tuple/POJO field level. + * + * A typical typeHierarchy could look like: + * + * UDF: MyMapFunction.class + * top-level UDF: MyMapFunctionBase.class + * RichMapFunction: RichMapFunction.class + * MapFunction: MapFunction.class + * Function's OUT: Tuple1<MyPojo> + * user-defined POJO: MyPojo.class + * user-defined top-level POJO: MyPojoBase.class + * POJO field: Tuple1<String> + * Field type: String.class + * + */ private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); - // We need this to detect recursive types and not get caught - // in an endless recursion - private Set<Class<?>> alreadySeen; - protected TypeExtractor() { - alreadySeen = new HashSet<Class<?>>(); + // only create instances for special use cases } // -------------------------------------------------------------------------------------------- @@ -416,10 +432,12 @@ public class TypeExtractor { TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length]; for (int i = 0; i < subtypes.length; i++) { + ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy); + subTypeHierarchy.add(subtypes[i]); // sub type could not be determined with materializing // try to derive the type info of the TypeVariable from the immediate base child input as a last attempt if (subtypes[i] instanceof TypeVariable<?>) { - tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], typeHierarchy, in1Type, in2Type); + tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type); // variable could not be determined if (tupleSubTypes[i] == null) { @@ -430,7 +448,7 @@ public class TypeExtractor { + "all variables in the return type can be deduced from the input type(s)."); } } else { - tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(new ArrayList<Type>(typeHierarchy), subtypes[i], in1Type, in2Type); + tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type); } } @@ -912,6 +930,19 @@ public class TypeExtractor { // -------------------------------------------------------------------------------------------- // Utility methods // -------------------------------------------------------------------------------------------- + + /** + * @return number of items with equal type or same raw type + */ + private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) { + int count = 0; + for (Type t : typeHierarchy) { + if (t == type || (isClassType(type) && t == typeToClass(type))) { + count++; + } + } + return count; + } /** * @param curT : start type @@ -1183,12 +1214,10 @@ public class TypeExtractor { return (TypeInformation<OUT>) new AvroTypeInfo(clazz); } - if (alreadySeen.contains(clazz)) { + if (countTypeInHierarchy(typeHierarchy, clazz) > 1) { return new GenericTypeInfo<OUT>(clazz); } - alreadySeen.add(clazz); - if (Modifier.isInterface(clazz.getModifiers())) { // Interface has no members and is therefore not handled as POJO return new GenericTypeInfo<OUT>(clazz); http://git-wip-us.apache.org/repos/asf/flink/blob/ec3b9832/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java index 1f3f71c..34fde20 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -810,4 +810,67 @@ public class PojoTypeExtractionTest { + ">")); Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); } + + public static class RecursivePojo1 { + public RecursivePojo1 field; + } + + public static class RecursivePojo2 { + public Tuple1<RecursivePojo2> field; + } + + public static class RecursivePojo3 { + public NestedPojo field; + } + + public static class NestedPojo { + public RecursivePojo3 field; + } + + @Test + public void testRecursivePojo1() { + TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class); + Assert.assertTrue(ti instanceof PojoTypeInfo); + Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass()); + } + + @Test + public void testRecursivePojo2() { + TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class); + Assert.assertTrue(ti instanceof PojoTypeInfo); + PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0); + Assert.assertTrue(pf.type instanceof TupleTypeInfo); + Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass()); + } + + @Test + public void testRecursivePojo3() { + TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class); + Assert.assertTrue(ti instanceof PojoTypeInfo); + PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0); + Assert.assertTrue(pf.type instanceof PojoTypeInfo); + Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass()); + } + + public static class FooBarPojo { + public int foo, bar; + public FooBarPojo() {} + } + + public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> { + @Override + public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception { + return null; + } + } + + @Test + public void testDualUseOfPojo() { + MapFunction<?, ?> function = new DuplicateMapper(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class)); + Assert.assertTrue(ti instanceof TupleTypeInfo); + TupleTypeInfo<?> tti = ((TupleTypeInfo) ti); + Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo); + Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo); + } }