Repository: flink
Updated Branches:
  refs/heads/release-0.9 80d3478c0 -> ec3b98327


[FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has 
two fields of the same POJO type


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec3b9832
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec3b9832
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec3b9832

Branch: refs/heads/release-0.9
Commit: ec3b983276e85e0baed98ffa8acf6709f20032de
Parents: 80d3478
Author: twalthr <twal...@apache.org>
Authored: Tue Aug 4 15:30:28 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Aug 5 16:25:32 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 53 ++++++++++++----
 .../type/extractor/PojoTypeExtractionTest.java  | 63 ++++++++++++++++++++
 2 files changed, 104 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec3b9832/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 41644f9..1ae8d3d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -26,9 +26,7 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -66,15 +64,33 @@ import com.google.common.base.Preconditions;
  * functions.
  */
 public class TypeExtractor {
+
+       /*
+        * NOTE: Most methods of the TypeExtractor work with a so-called 
"typeHierarchy".
+        * The type hierarchy describes all types (Classes, ParameterizedTypes, 
TypeVariables etc. ) and intermediate
+        * types from a given type of a function or type (e.g. MyMapper, 
Tuple2) until a current type
+        * (depends on the method, e.g. MyPojoFieldType).
+        *
+        * Thus, it fully qualifies types until tuple/POJO field level.
+        *
+        * A typical typeHierarchy could look like:
+        *
+        * UDF: MyMapFunction.class
+        * top-level UDF: MyMapFunctionBase.class
+        * RichMapFunction: RichMapFunction.class
+        * MapFunction: MapFunction.class
+        * Function's OUT: Tuple1<MyPojo>
+        * user-defined POJO: MyPojo.class
+        * user-defined top-level POJO: MyPojoBase.class
+        * POJO field: Tuple1<String>
+        * Field type: String.class
+        *
+        */
        
        private static final Logger LOG = 
LoggerFactory.getLogger(TypeExtractor.class);
 
-       // We need this to detect recursive types and not get caught
-       // in an endless recursion
-       private Set<Class<?>> alreadySeen;
-
        protected TypeExtractor() {
-               alreadySeen = new HashSet<Class<?>>();
+               // only create instances for special use cases
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -416,10 +432,12 @@ public class TypeExtractor {
                        
                        TypeInformation<?>[] tupleSubTypes = new 
TypeInformation<?>[subtypes.length];
                        for (int i = 0; i < subtypes.length; i++) {
+                               ArrayList<Type> subTypeHierarchy = new 
ArrayList<Type>(typeHierarchy);
+                               subTypeHierarchy.add(subtypes[i]);
                                // sub type could not be determined with 
materializing
                                // try to derive the type info of the 
TypeVariable from the immediate base child input as a last attempt
                                if (subtypes[i] instanceof TypeVariable<?>) {
-                                       tupleSubTypes[i] = 
createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], typeHierarchy, in1Type, 
in2Type);
+                                       tupleSubTypes[i] = 
createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, 
in1Type, in2Type);
                                        
                                        // variable could not be determined
                                        if (tupleSubTypes[i] == null) {
@@ -430,7 +448,7 @@ public class TypeExtractor {
                                                                + "all 
variables in the return type can be deduced from the input type(s).");
                                        }
                                } else {
-                                       tupleSubTypes[i] = 
createTypeInfoWithTypeHierarchy(new ArrayList<Type>(typeHierarchy), 
subtypes[i], in1Type, in2Type);
+                                       tupleSubTypes[i] = 
createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, 
in2Type);
                                }
                        }
                        
@@ -912,6 +930,19 @@ public class TypeExtractor {
        // 
--------------------------------------------------------------------------------------------
        //  Utility methods
        // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * @return number of items with equal type or same raw type
+        */
+       private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, 
Type type) {
+               int count = 0;
+               for (Type t : typeHierarchy) {
+                       if (t == type || (isClassType(type) && t == 
typeToClass(type))) {
+                               count++;
+                       }
+               }
+               return count;
+       }
        
        /**
         * @param curT : start type
@@ -1183,12 +1214,10 @@ public class TypeExtractor {
                        return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
                }
 
-               if (alreadySeen.contains(clazz)) {
+               if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
                        return new GenericTypeInfo<OUT>(clazz);
                }
 
-               alreadySeen.add(clazz);
-
                if (Modifier.isInterface(clazz.getModifiers())) {
                        // Interface has no members and is therefore not 
handled as POJO
                        return new GenericTypeInfo<OUT>(clazz);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec3b9832/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 1f3f71c..34fde20 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -810,4 +810,67 @@ public class PojoTypeExtractionTest {
                                                + ">"));
                Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
        }
+
+       public static class RecursivePojo1 {
+               public RecursivePojo1 field;
+       }
+
+       public static class RecursivePojo2 {
+               public Tuple1<RecursivePojo2> field;
+       }
+
+       public static class RecursivePojo3 {
+               public NestedPojo field;
+       }
+
+       public static class NestedPojo {
+               public RecursivePojo3 field;
+       }
+
+       @Test
+       public void testRecursivePojo1() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo1.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
ti).getPojoFieldAt(0).type.getClass());
+       }
+
+       @Test
+       public void testRecursivePojo2() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo2.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+               Assert.assertTrue(pf.type instanceof TupleTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) 
pf.type).getTypeAt(0).getClass());
+       }
+
+       @Test
+       public void testRecursivePojo3() {
+               TypeInformation<?> ti = 
TypeExtractor.createTypeInfo(RecursivePojo3.class);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+               Assert.assertTrue(pf.type instanceof PojoTypeInfo);
+               Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) 
pf.type).getPojoFieldAt(0).type.getClass());
+       }
+
+       public static class FooBarPojo {
+               public int foo, bar;
+               public FooBarPojo() {}
+       }
+
+       public static class DuplicateMapper implements MapFunction<FooBarPojo, 
Tuple2<FooBarPojo, FooBarPojo>> {
+               @Override
+               public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) 
throws Exception {
+                       return null;
+               }
+       }
+
+       @Test
+       public void testDualUseOfPojo() {
+               MapFunction<?, ?> function = new DuplicateMapper();
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeExtractor.createTypeInfo(FooBarPojo.class));
+               Assert.assertTrue(ti instanceof TupleTypeInfo);
+               TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
+               Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
+               Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
+       }
 }

Reply via email to