[FLINK-3566] [FLINK-3563] [core] TypeExtraction input type validation fixes
This closes #1759 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/434e88fd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/434e88fd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/434e88fd Branch: refs/heads/master Commit: 434e88fdd07a0052d142c275e0e631fc089066b0 Parents: b0befc4 Author: twalthr <twal...@apache.org> Authored: Thu Mar 3 15:44:28 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Mar 4 20:58:33 2016 +0100 ---------------------------------------------------------------------- .../flink/api/java/typeutils/TypeExtractor.java | 14 +-- .../api/java/typeutils/TypeExtractorTest.java | 112 +++++++++++++++++++ 2 files changed, 119 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/434e88fd/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 01afe14..dd4b132 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -890,8 +890,8 @@ public class TypeExtractor { } if (!(type instanceof TypeVariable<?>)) { - // check for basic type - if (typeInfo.isBasicType()) { + // check for Java Basic Types + if (typeInfo instanceof BasicTypeInfo) { TypeInformation<?> actual; // check if basic type at all @@ -904,8 +904,8 @@ public class TypeExtractor { } } - // check for tuple - else if (typeInfo.isTupleType()) { + // check for Java Tuples + else if (typeInfo instanceof TupleTypeInfo) { // check if tuple at all if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) { throw new InvalidTypesException("Tuple type expected."); @@ -1079,9 +1079,9 @@ public class TypeExtractor { // check for generic object else if (typeInfo instanceof GenericTypeInfo<?>) { Class<?> clazz = null; - if (!(isClassType(type) && ((GenericTypeInfo<?>) typeInfo).getTypeClass() == (clazz = typeToClass(type)))) { - throw new InvalidTypesException("Generic object type '" - + ((GenericTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '" + if (!(isClassType(type) && (clazz = typeToClass(type)).isAssignableFrom(((GenericTypeInfo<?>) typeInfo).getTypeClass()))) { + throw new InvalidTypesException("Generic type '" + + ((GenericTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' or a subclass of it expected but was '" + clazz.getCanonicalName() + "'."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/434e88fd/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index cdafa6a..a8f5ded 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -22,8 +22,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichCoGroupFunction; @@ -38,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple0; @@ -1904,4 +1908,112 @@ public class TypeExtractorTest { Either<String, Tuple1<Integer>> either = Either.Left("test"); TypeExtractor.getForObject(either); } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericTypeWithSubclassInput() { + Map<String, Object> inputMap = new HashMap<>(); + inputMap.put("a", "b"); + TypeInformation<?> inputType = TypeExtractor.createTypeInfo(inputMap.getClass()); + + MapFunction<?, ?> function = new MapFunction<Map<String, Object>,Map<String, Object>>(){ + + @Override + public Map<String, Object> map(Map<String, Object> stringObjectMap) throws Exception { + return stringObjectMap; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType); + TypeInformation<?> expected = TypeExtractor.createTypeInfo(Map.class); + Assert.assertEquals(expected, ti); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(expected=InvalidTypesException.class) + public void testGenericTypeWithSuperclassInput() { + TypeInformation<?> inputType = TypeExtractor.createTypeInfo(Map.class); + + MapFunction<?, ?> function = new MapFunction<HashMap<String, Object>,Map<String, Object>>(){ + + @Override + public Map<String, Object> map(HashMap<String, Object> stringObjectMap) throws Exception { + return stringObjectMap; + } + }; + + TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInputWithCustomTypeInfo() { + TypeInformation<?> customTypeInfo = new TypeInformation<Object>() { + + @Override + public boolean isBasicType() { + return true; + } + + @Override + public boolean isTupleType() { + return true; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return 0; + } + + @Override + public Class getTypeClass() { + return null; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer<Object> createSerializer(ExecutionConfig config) { + return null; + } + + @Override + public String toString() { + return null; + } + + @Override + public boolean equals(Object obj) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean canEqual(Object obj) { + return false; + } + }; + + MapFunction<?, ?> function = new MapFunction<Tuple1<String>, Tuple1<Object>>() { + @Override + public Tuple1<Object> map(Tuple1<String> value) throws Exception { + return null; + } + }; + + TypeExtractor.getMapReturnTypes(function, + (TypeInformation) new TupleTypeInfo<Tuple1<Object>>(customTypeInfo)); + } }