[FLINK-3566] [FLINK-3563] [core] TypeExtraction input type validation fixes

This closes #1759


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/434e88fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/434e88fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/434e88fd

Branch: refs/heads/master
Commit: 434e88fdd07a0052d142c275e0e631fc089066b0
Parents: b0befc4
Author: twalthr <twal...@apache.org>
Authored: Thu Mar 3 15:44:28 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 4 20:58:33 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java |  14 +--
 .../api/java/typeutils/TypeExtractorTest.java   | 112 +++++++++++++++++++
 2 files changed, 119 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/434e88fd/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 01afe14..dd4b132 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -890,8 +890,8 @@ public class TypeExtractor {
                }
                
                if (!(type instanceof TypeVariable<?>)) {
-                       // check for basic type
-                       if (typeInfo.isBasicType()) {
+                       // check for Java Basic Types
+                       if (typeInfo instanceof BasicTypeInfo) {
                                
                                TypeInformation<?> actual;
                                // check if basic type at all
@@ -904,8 +904,8 @@ public class TypeExtractor {
                                }
                                
                        }
-                       // check for tuple
-                       else if (typeInfo.isTupleType()) {
+                       // check for Java Tuples
+                       else if (typeInfo instanceof TupleTypeInfo) {
                                // check if tuple at all
                                if (!(isClassType(type) && 
Tuple.class.isAssignableFrom(typeToClass(type)))) {
                                        throw new InvalidTypesException("Tuple 
type expected.");
@@ -1079,9 +1079,9 @@ public class TypeExtractor {
                        // check for generic object
                        else if (typeInfo instanceof GenericTypeInfo<?>) {
                                Class<?> clazz = null;
-                               if (!(isClassType(type) && 
((GenericTypeInfo<?>) typeInfo).getTypeClass() == (clazz = typeToClass(type)))) 
{
-                                       throw new 
InvalidTypesException("Generic object type '"
-                                                       + ((GenericTypeInfo<?>) 
typeInfo).getTypeClass().getCanonicalName() + "' expected but was '"
+                               if (!(isClassType(type) && (clazz = 
typeToClass(type)).isAssignableFrom(((GenericTypeInfo<?>) 
typeInfo).getTypeClass()))) {
+                                       throw new 
InvalidTypesException("Generic type '"
+                                                       + ((GenericTypeInfo<?>) 
typeInfo).getTypeClass().getCanonicalName() + "' or a subclass of it expected 
but was '"
                                                        + 
clazz.getCanonicalName() + "'.");
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/434e88fd/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
index cdafa6a..a8f5ded 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
@@ -22,8 +22,11 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -38,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
@@ -1904,4 +1908,112 @@ public class TypeExtractorTest {
                Either<String, Tuple1<Integer>> either = Either.Left("test");
                TypeExtractor.getForObject(either);
        }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testGenericTypeWithSubclassInput() {
+               Map<String, Object> inputMap = new HashMap<>();
+               inputMap.put("a", "b");
+               TypeInformation<?> inputType = 
TypeExtractor.createTypeInfo(inputMap.getClass());
+
+               MapFunction<?, ?> function = new MapFunction<Map<String, 
Object>,Map<String, Object>>(){
+
+                       @Override
+                       public Map<String, Object> map(Map<String, Object> 
stringObjectMap) throws Exception {
+                               return stringObjectMap;
+                       }
+               };
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType);
+               TypeInformation<?> expected = 
TypeExtractor.createTypeInfo(Map.class);
+               Assert.assertEquals(expected, ti);
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test(expected=InvalidTypesException.class)
+       public void testGenericTypeWithSuperclassInput() {
+               TypeInformation<?> inputType = 
TypeExtractor.createTypeInfo(Map.class);
+
+               MapFunction<?, ?> function = new MapFunction<HashMap<String, 
Object>,Map<String, Object>>(){
+
+                       @Override
+                       public Map<String, Object> map(HashMap<String, Object> 
stringObjectMap) throws Exception {
+                               return stringObjectMap;
+                       }
+               };
+
+               TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
inputType);
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Test
+       public void testInputWithCustomTypeInfo() {
+               TypeInformation<?> customTypeInfo = new 
TypeInformation<Object>() {
+
+                       @Override
+                       public boolean isBasicType() {
+                               return true;
+                       }
+
+                       @Override
+                       public boolean isTupleType() {
+                               return true;
+                       }
+
+                       @Override
+                       public int getArity() {
+                               return 0;
+                       }
+
+                       @Override
+                       public int getTotalFields() {
+                               return 0;
+                       }
+
+                       @Override
+                       public Class getTypeClass() {
+                               return null;
+                       }
+
+                       @Override
+                       public boolean isKeyType() {
+                               return false;
+                       }
+
+                       @Override
+                       public TypeSerializer<Object> 
createSerializer(ExecutionConfig config) {
+                               return null;
+                       }
+
+                       @Override
+                       public String toString() {
+                               return null;
+                       }
+
+                       @Override
+                       public boolean equals(Object obj) {
+                               return false;
+                       }
+
+                       @Override
+                       public int hashCode() {
+                               return 0;
+                       }
+
+                       @Override
+                       public boolean canEqual(Object obj) {
+                               return false;
+                       }
+               };
+
+               MapFunction<?, ?> function = new MapFunction<Tuple1<String>, 
Tuple1<Object>>() {
+                       @Override
+                       public Tuple1<Object> map(Tuple1<String> value) throws 
Exception {
+                               return null;
+                       }
+               };
+
+               TypeExtractor.getMapReturnTypes(function,
+                       (TypeInformation) new 
TupleTypeInfo<Tuple1<Object>>(customTypeInfo));
+       }
 }

Reply via email to