http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java new file mode 100644 index 0000000..cdafa6a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -0,0 +1,1907 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichCoGroupFunction; +import org.apache.flink.api.common.functions.RichCrossFunction; +import org.apache.flink.api.common.functions.RichFlatJoinFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.types.Either; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.StringValue; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; + +import org.apache.hadoop.io.Writable; + +import org.junit.Assert; +import org.junit.Test; + +@SuppressWarnings("serial") +public class TypeExtractorTest { + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testBasicType() { + // use getGroupReduceReturnTypes() + RichGroupReduceFunction<?, ?> function = new RichGroupReduceFunction<Boolean, Boolean>() { + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterable<Boolean> values, Collector<Boolean> out) throws Exception { + // nothing to do + } + }; + + TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean")); + + Assert.assertTrue(ti.isBasicType()); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); + Assert.assertEquals(Boolean.class, ti.getTypeClass()); + + // use getForClass() + Assert.assertTrue(TypeExtractor.getForClass(Boolean.class).isBasicType()); + Assert.assertEquals(ti, TypeExtractor.getForClass(Boolean.class)); + + // use getForObject() + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeExtractor.getForObject(true)); + } + + public static class MyWritable implements Writable { + + @Override + public void write(DataOutput out) throws IOException { + + } + + @Override + public void readFields(DataInput in) throws IOException { + } + + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testWritableType() { + RichMapFunction<?, ?> function = new RichMapFunction<MyWritable, MyWritable>() { + private static final long serialVersionUID = 1L; + + @Override + public MyWritable map(MyWritable value) throws Exception { + return null; + } + + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) new WritableTypeInfo<MyWritable>(MyWritable.class)); + + Assert.assertTrue(ti instanceof WritableTypeInfo<?>); + Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testTupleWithBasicTypes() throws Exception { + // use getMapReturnTypes() + RichMapFunction<?, ?> function = new RichMapFunction<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>, Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte> map( + Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte> value) throws Exception { + return null; + } + + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(9, ti.getArity()); + Assert.assertTrue(ti instanceof TupleTypeInfo); + List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); + ((TupleTypeInfo) ti).getFlatFields("f3", 0, ffd); + Assert.assertTrue(ffd.size() == 1); + Assert.assertEquals(3, ffd.get(0).getPosition() ); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(Tuple9.class, tti.getTypeClass()); + + for (int i = 0; i < 9; i++) { + Assert.assertTrue(tti.getTypeAt(i) instanceof BasicTypeInfo); + } + + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tti.getTypeAt(2)); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti.getTypeAt(3)); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(4)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(5)); + Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti.getTypeAt(6)); + Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tti.getTypeAt(7)); + Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, tti.getTypeAt(8)); + + // use getForObject() + Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte> t = new Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>( + 1, 1L, 1.0, 1.0F, false, "Hello World", 'w', (short) 1, (byte) 1); + + Assert.assertTrue(TypeExtractor.getForObject(t) instanceof TupleTypeInfo); + TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) TypeExtractor.getForObject(t); + + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti2.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti2.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tti2.getTypeAt(2)); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti2.getTypeAt(3)); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti2.getTypeAt(4)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti2.getTypeAt(5)); + Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti2.getTypeAt(6)); + Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tti2.getTypeAt(7)); + Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, tti2.getTypeAt(8)); + + // test that getForClass does not work + try { + TypeExtractor.getForClass(Tuple9.class); + Assert.fail("Exception expected here"); + } catch (InvalidTypesException e) { + // that is correct + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testTupleWithTuples() { + // use getFlatMapReturnTypes() + RichFlatMapFunction<?, ?> function = new RichFlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>, Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>> value, + Collector<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>> out) throws Exception { + // nothing to do + } + }; + + TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>")); + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(3, ti.getArity()); + Assert.assertTrue(ti instanceof TupleTypeInfo); + List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); + + ((TupleTypeInfo) ti).getFlatFields("f0.f0", 0, ffd); + Assert.assertEquals(0, ffd.get(0).getPosition() ); + ffd.clear(); + + ((TupleTypeInfo) ti).getFlatFields("f0.f0", 0, ffd); + Assert.assertTrue( ffd.get(0).getType() instanceof BasicTypeInfo ); + Assert.assertTrue( ffd.get(0).getType().getTypeClass().equals(String.class) ); + ffd.clear(); + + ((TupleTypeInfo) ti).getFlatFields("f1.f0", 0, ffd); + Assert.assertEquals(1, ffd.get(0).getPosition() ); + ffd.clear(); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(Tuple3.class, tti.getTypeClass()); + + Assert.assertTrue(tti.getTypeAt(0).isTupleType()); + Assert.assertTrue(tti.getTypeAt(1).isTupleType()); + Assert.assertTrue(tti.getTypeAt(2).isTupleType()); + + Assert.assertEquals(Tuple1.class, tti.getTypeAt(0).getTypeClass()); + Assert.assertEquals(Tuple1.class, tti.getTypeAt(1).getTypeClass()); + Assert.assertEquals(Tuple2.class, tti.getTypeAt(2).getTypeClass()); + + Assert.assertEquals(1, tti.getTypeAt(0).getArity()); + Assert.assertEquals(1, tti.getTypeAt(1).getArity()); + Assert.assertEquals(2, tti.getTypeAt(2).getArity()); + + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ((TupleTypeInfo<?>) tti.getTypeAt(0)).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ((TupleTypeInfo<?>) tti.getTypeAt(1)).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>) tti.getTypeAt(2)).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>) tti.getTypeAt(2)).getTypeAt(1)); + + // use getForObject() + Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>> t = new Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>( + new Tuple1<String>("hello"), new Tuple1<Integer>(1), new Tuple2<Long, Long>(2L, 3L)); + Assert.assertTrue(TypeExtractor.getForObject(t) instanceof TupleTypeInfo); + TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) TypeExtractor.getForObject(t); + + Assert.assertEquals(1, tti2.getTypeAt(0).getArity()); + Assert.assertEquals(1, tti2.getTypeAt(1).getArity()); + Assert.assertEquals(2, tti2.getTypeAt(2).getArity()); + + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ((TupleTypeInfo<?>) tti2.getTypeAt(0)).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ((TupleTypeInfo<?>) tti2.getTypeAt(1)).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>) tti2.getTypeAt(2)).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>) tti2.getTypeAt(2)).getTypeAt(1)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testTuple0() { + // use getFlatMapReturnTypes() + RichFlatMapFunction<?, ?> function = new RichFlatMapFunction<Tuple0, Tuple0>() { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple0 value, Collector<Tuple0> out) throws Exception { + // nothing to do + } + }; + + TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(function, + (TypeInformation) TypeInfoParser.parse("Tuple0")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(0, ti.getArity()); + Assert.assertTrue(ti instanceof TupleTypeInfo); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testSubclassOfTuple() { + // use getJoinReturnTypes() + RichFlatJoinFunction<?, ?, ?> function = new RichFlatJoinFunction<CustomTuple, String, CustomTuple>() { + private static final long serialVersionUID = 1L; + + @Override + public void join(CustomTuple first, String second, Collector<CustomTuple> out) throws Exception { + out.collect(null); + } + }; + + TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), (TypeInformation) TypeInfoParser.parse("String")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ((TupleTypeInfo<?>) ti).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ((TupleTypeInfo<?>) ti).getTypeAt(1)); + Assert.assertEquals(CustomTuple.class, ((TupleTypeInfo<?>) ti).getTypeClass()); + + // use getForObject() + CustomTuple t = new CustomTuple("hello", 1); + TypeInformation<?> ti2 = TypeExtractor.getForObject(t); + + Assert.assertTrue(ti2.isTupleType()); + Assert.assertEquals(2, ti2.getArity()); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ((TupleTypeInfo<?>) ti2).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ((TupleTypeInfo<?>) ti2).getTypeAt(1)); + Assert.assertEquals(CustomTuple.class, ((TupleTypeInfo<?>) ti2).getTypeClass()); + } + + public static class CustomTuple extends Tuple2<String, Integer> { + private static final long serialVersionUID = 1L; + + public CustomTuple(String myField1, Integer myField2) { + this.setFields(myField1, myField2); + } + + public String getMyField1() { + return this.f0; + } + + public int getMyField2() { + return this.f1; + } + } + + public static class PojoWithNonPublicDefaultCtor { + public int foo, bar; + PojoWithNonPublicDefaultCtor() {} + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testPojo() { + // use getCrossReturnTypes() + RichCrossFunction<?, ?, ?> function = new RichCrossFunction<CustomType, Integer, CustomType>() { + private static final long serialVersionUID = 1L; + + @Override + public CustomType cross(CustomType first, Integer second) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(function, + (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType"), + (TypeInformation) TypeInfoParser.parse("Integer")); + + Assert.assertFalse(ti.isBasicType()); + Assert.assertFalse(ti.isTupleType()); + Assert.assertTrue(ti instanceof PojoTypeInfo); + Assert.assertEquals(ti.getTypeClass(), CustomType.class); + + // use getForClass() + Assert.assertTrue(TypeExtractor.getForClass(CustomType.class) instanceof PojoTypeInfo); + Assert.assertEquals(TypeExtractor.getForClass(CustomType.class).getTypeClass(), ti.getTypeClass()); + + // use getForObject() + CustomType t = new CustomType("World", 1); + TypeInformation<?> ti2 = TypeExtractor.getForObject(t); + + Assert.assertFalse(ti2.isBasicType()); + Assert.assertFalse(ti2.isTupleType()); + Assert.assertTrue(ti2 instanceof PojoTypeInfo); + Assert.assertEquals(ti2.getTypeClass(), CustomType.class); + + Assert.assertFalse(TypeExtractor.getForClass(PojoWithNonPublicDefaultCtor.class) instanceof PojoTypeInfo); + } + + + + public static class CustomType { + public String myField1; + public int myField2; + + public CustomType() { + } + + public CustomType(String myField1, int myField2) { + this.myField1 = myField1; + this.myField2 = myField2; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testTupleWithPojo() { + // use getMapReturnTypes() + RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<Long, CustomType>, Tuple2<Long, CustomType>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Long, CustomType> map(Tuple2<Long, CustomType> value) throws Exception { + return null; + } + + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, + (TypeInformation) TypeInfoParser.parse("Tuple2<Long,org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(Tuple2.class, tti.getTypeClass()); + List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); + + tti.getFlatFields("f0", 0, ffd); + Assert.assertEquals(1, ffd.size()); + Assert.assertEquals(0, ffd.get(0).getPosition() ); // Long + Assert.assertTrue( ffd.get(0).getType().getTypeClass().equals(Long.class) ); + ffd.clear(); + + tti.getFlatFields("f1.myField1", 0, ffd); + Assert.assertEquals(1, ffd.get(0).getPosition() ); + Assert.assertTrue( ffd.get(0).getType().getTypeClass().equals(String.class) ); + ffd.clear(); + + + tti.getFlatFields("f1.myField2", 0, ffd); + Assert.assertEquals(2, ffd.get(0).getPosition() ); + Assert.assertTrue( ffd.get(0).getType().getTypeClass().equals(Integer.class) ); + + + Assert.assertEquals(Long.class, tti.getTypeAt(0).getTypeClass()); + Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo); + Assert.assertEquals(CustomType.class, tti.getTypeAt(1).getTypeClass()); + + // use getForObject() + Tuple2<?, ?> t = new Tuple2<Long, CustomType>(1L, new CustomType("Hello", 1)); + TypeInformation<?> ti2 = TypeExtractor.getForObject(t); + + Assert.assertTrue(ti2.isTupleType()); + Assert.assertEquals(2, ti2.getArity()); + TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) ti2; + + Assert.assertEquals(Tuple2.class, tti2.getTypeClass()); + Assert.assertEquals(Long.class, tti2.getTypeAt(0).getTypeClass()); + Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo); + Assert.assertEquals(CustomType.class, tti2.getTypeAt(1).getTypeClass()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testValue() { + // use getKeyExtractorType() + KeySelector<?, ?> function = new KeySelector<StringValue, StringValue>() { + private static final long serialVersionUID = 1L; + + @Override + public StringValue getKey(StringValue value) { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(function, (TypeInformation) TypeInfoParser.parse("StringValue")); + + Assert.assertFalse(ti.isBasicType()); + Assert.assertFalse(ti.isTupleType()); + Assert.assertTrue(ti instanceof ValueTypeInfo); + Assert.assertEquals(ti.getTypeClass(), StringValue.class); + + // use getForClass() + Assert.assertTrue(TypeExtractor.getForClass(StringValue.class) instanceof ValueTypeInfo); + Assert.assertEquals(TypeExtractor.getForClass(StringValue.class).getTypeClass(), ti.getTypeClass()); + + // use getForObject() + StringValue v = new StringValue("Hello"); + Assert.assertTrue(TypeExtractor.getForObject(v) instanceof ValueTypeInfo); + Assert.assertEquals(TypeExtractor.getForObject(v).getTypeClass(), ti.getTypeClass()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testTupleOfValues() { + // use getMapReturnTypes() + RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<StringValue, IntValue> map(Tuple2<StringValue, IntValue> value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<StringValue, IntValue>")); + + Assert.assertFalse(ti.isBasicType()); + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(StringValue.class, ((TupleTypeInfo<?>) ti).getTypeAt(0).getTypeClass()); + Assert.assertEquals(IntValue.class, ((TupleTypeInfo<?>) ti).getTypeAt(1).getTypeClass()); + + // use getForObject() + Tuple2<StringValue, IntValue> t = new Tuple2<StringValue, IntValue>(new StringValue("x"), new IntValue(1)); + TypeInformation<?> ti2 = TypeExtractor.getForObject(t); + + Assert.assertFalse(ti2.isBasicType()); + Assert.assertTrue(ti2.isTupleType()); + Assert.assertEquals(((TupleTypeInfo<?>) ti2).getTypeAt(0).getTypeClass(), StringValue.class); + Assert.assertEquals(((TupleTypeInfo<?>) ti2).getTypeAt(1).getTypeClass(), IntValue.class); + } + + public static class LongKeyValue<V> extends Tuple2<Long, V> { + private static final long serialVersionUID = 1L; + + public LongKeyValue(Long field1, V field2) { + this.f0 = field1; + this.f1 = field2; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericsNotInSuperclass() { + // use getMapReturnTypes() + RichMapFunction<?, ?> function = new RichMapFunction<LongKeyValue<String>, LongKeyValue<String>>() { + private static final long serialVersionUID = 1L; + + @Override + public LongKeyValue<String> map(LongKeyValue<String> value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Long, String>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(LongKeyValue.class, tti.getTypeClass()); + + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); + } + + public static class ChainedOne<X, Y> extends Tuple3<X, Long, Y> { + private static final long serialVersionUID = 1L; + + public ChainedOne(X field0, Long field1, Y field2) { + this.f0 = field0; + this.f1 = field1; + this.f2 = field2; + } + } + + public static class ChainedTwo<V> extends ChainedOne<String, V> { + private static final long serialVersionUID = 1L; + + public ChainedTwo(String field0, Long field1, V field2) { + super(field0, field1, field2); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testChainedGenericsNotInSuperclass() { + // use TypeExtractor + RichMapFunction<?, ?> function = new RichMapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public ChainedTwo<Integer> map(ChainedTwo<Integer> value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, Integer>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(3, ti.getArity()); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(ChainedTwo.class, tti.getTypeClass()); + + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(2)); + } + + public static class ChainedThree extends ChainedTwo<String> { + private static final long serialVersionUID = 1L; + + public ChainedThree(String field0, Long field1, String field2) { + super(field0, field1, field2); + } + } + + public static class ChainedFour extends ChainedThree { + private static final long serialVersionUID = 1L; + + public ChainedFour(String field0, Long field1, String field2) { + super(field0, field1, field2); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericsInDirectSuperclass() { + // use TypeExtractor + RichMapFunction<?, ?> function = new RichMapFunction<ChainedThree, ChainedThree>() { + private static final long serialVersionUID = 1L; + + @Override + public ChainedThree map(ChainedThree value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, String>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(3, ti.getArity()); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(ChainedThree.class, tti.getTypeClass()); + + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(2)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testGenericsNotInSuperclassWithNonGenericClassAtEnd() { + // use TypeExtractor + RichMapFunction<?, ?> function = new RichMapFunction<ChainedFour, ChainedFour>() { + private static final long serialVersionUID = 1L; + + @Override + public ChainedFour map(ChainedFour value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, String>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(3, ti.getArity()); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(ChainedFour.class, tti.getTypeClass()); + + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(2)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testMissingTupleGenerics() { + RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple2>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2 map(String value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + + try { + TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String")); + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testTupleSupertype() { + RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple map(String value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + + try { + TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String")); + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected + } + } + + public static class SameTypeVariable<X> extends Tuple2<X, X> { + private static final long serialVersionUID = 1L; + + public SameTypeVariable(X field0, X field1) { + super(field0, field1); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testSameGenericVariable() { + RichMapFunction<?, ?> function = new RichMapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() { + private static final long serialVersionUID = 1L; + + @Override + public SameTypeVariable<String> map(SameTypeVariable<String> value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(SameTypeVariable.class, tti.getTypeClass()); + + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); + } + + public static class Nested<V, T> extends Tuple2<V, Tuple2<T, T>> { + private static final long serialVersionUID = 1L; + + public Nested(V field0, Tuple2<T, T> field1) { + super(field0, field1); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testNestedTupleGenerics() { + RichMapFunction<?, ?> function = new RichMapFunction<Nested<String, Integer>, Nested<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public Nested<String, Integer> map(Nested<String, Integer> value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, Tuple2<Integer, Integer>>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(Nested.class, tti.getTypeClass()); + + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertTrue(tti.getTypeAt(1).isTupleType()); + Assert.assertEquals(2, tti.getTypeAt(1).getArity()); + + // Nested + TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) tti.getTypeAt(1); + Assert.assertEquals(Tuple2.class, tti2.getTypeClass()); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti2.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti2.getTypeAt(1)); + } + + public static class Nested2<T> extends Nested<T, Nested<Integer, T>> { + private static final long serialVersionUID = 1L; + + public Nested2(T field0, Tuple2<Nested<Integer, T>, Nested<Integer, T>> field1) { + super(field0, field1); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testNestedTupleGenerics2() { + RichMapFunction<?, ?> function = new RichMapFunction<Nested2<Boolean>, Nested2<Boolean>>() { + private static final long serialVersionUID = 1L; + + @Override + public Nested2<Boolean> map(Nested2<Boolean> value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Boolean, Tuple2<Tuple2<Integer, Tuple2<Boolean, Boolean>>, Tuple2<Integer, Tuple2<Boolean, Boolean>>>>")); + + // Should be + // Tuple2<Boolean, Tuple2<Tuple2<Integer, Tuple2<Boolean, Boolean>>, Tuple2<Integer, Tuple2<Boolean, Boolean>>>> + + // 1st nested level + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertTrue(tti.getTypeAt(1).isTupleType()); + + // 2nd nested level + TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) tti.getTypeAt(1); + Assert.assertTrue(tti2.getTypeAt(0).isTupleType()); + Assert.assertTrue(tti2.getTypeAt(1).isTupleType()); + + // 3rd nested level + TupleTypeInfo<?> tti3 = (TupleTypeInfo<?>) tti2.getTypeAt(0); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti3.getTypeAt(0)); + Assert.assertTrue(tti3.getTypeAt(1).isTupleType()); + + // 4th nested level + TupleTypeInfo<?> tti4 = (TupleTypeInfo<?>) tti3.getTypeAt(1); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti4.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti4.getTypeAt(1)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testFunctionWithMissingGenerics() { + RichMapFunction function = new RichMapFunction() { + private static final long serialVersionUID = 1L; + + @Override + public String map(Object value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + + try { + TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String")); + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testFunctionDependingOnInputAsSuperclass() { + IdentityMapper<Boolean> function = new IdentityMapper<Boolean>() { + private static final long serialVersionUID = 1L; + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean")); + + Assert.assertTrue(ti.isBasicType()); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); + } + + public class IdentityMapper<T> extends RichMapFunction<T, T> { + private static final long serialVersionUID = 1L; + + @Override + public T map(T value) throws Exception { + return null; + } + } + + @Test + public void testFunctionDependingOnInputFromInput() { + IdentityMapper<Boolean> function = new IdentityMapper<Boolean>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.BOOLEAN_TYPE_INFO); + + Assert.assertTrue(ti.isBasicType()); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); + } + + @Test + public void testFunctionDependingOnInputWithMissingInput() { + IdentityMapper<Boolean> function = new IdentityMapper<Boolean>(); + + try { + TypeExtractor.getMapReturnTypes(function, null); + Assert.fail("exception expected"); + } catch (InvalidTypesException e) { + // right + } + } + + public class IdentityMapper2<T> extends RichMapFunction<Tuple2<T, String>, T> { + private static final long serialVersionUID = 1L; + + @Override + public T map(Tuple2<T, String> value) throws Exception { + return null; + } + } + + @Test + public void testFunctionDependingOnInputWithTupleInput() { + IdentityMapper2<Boolean> function = new IdentityMapper2<Boolean>(); + + TypeInformation<Tuple2<Boolean, String>> inputType = new TupleTypeInfo<Tuple2<Boolean, String>>(BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, inputType); + + Assert.assertTrue(ti.isBasicType()); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testFunctionDependingOnInputWithCustomTupleInput() { + IdentityMapper<SameTypeVariable<String>> function = new IdentityMapper<SameTypeVariable<String>>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); + } + + public class IdentityMapper3<T, V> extends RichMapFunction<T, V> { + private static final long serialVersionUID = 1L; + + @Override + public V map(T value) throws Exception { + return null; + } + } + + @Test + public void testFunctionDependingOnUnknownInput() { + IdentityMapper3<Boolean, String> function = new IdentityMapper3<Boolean, String>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.BOOLEAN_TYPE_INFO, "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + + try { + TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.BOOLEAN_TYPE_INFO); + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected + } + } + + public class IdentityMapper4<D> extends IdentityMapper<D> { + private static final long serialVersionUID = 1L; + } + + @Test + public void testFunctionDependingOnInputWithFunctionHierarchy() { + IdentityMapper4<String> function = new IdentityMapper4<String>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO); + + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); + } + + public class IdentityMapper5<D> extends IdentityMapper<Tuple2<D, D>> { + private static final long serialVersionUID = 1L; + } + + @Test + public void testFunctionDependingOnInputWithFunctionHierarchy2() { + IdentityMapper5<String> function = new IdentityMapper5<String>(); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)); + + Assert.assertTrue(ti.isTupleType()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); + } + + public class Mapper extends IdentityMapper<String> { + private static final long serialVersionUID = 1L; + + @Override + public String map(String value) throws Exception { + return null; + } + } + + public class Mapper2 extends Mapper { + private static final long serialVersionUID = 1L; + + @Override + public String map(String value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testFunctionWithNoGenericSuperclass() { + RichMapFunction<?, ?> function = new Mapper2(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String")); + + Assert.assertTrue(ti.isBasicType()); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); + } + + public class OneAppender<T> extends RichMapFunction<T, Tuple2<T, Integer>> { + private static final long serialVersionUID = 1L; + + public Tuple2<T, Integer> map(T value) { + return new Tuple2<T, Integer>(value, 1); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testFunctionDependingPartialOnInput() { + RichMapFunction<?, ?> function = new OneAppender<DoubleValue>() { + private static final long serialVersionUID = 1L; + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("DoubleValue")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + + Assert.assertTrue(tti.getTypeAt(0) instanceof ValueTypeInfo<?>); + ValueTypeInfo<?> vti = (ValueTypeInfo<?>) tti.getTypeAt(0); + Assert.assertEquals(DoubleValue.class, vti.getTypeClass()); + + Assert.assertTrue(tti.getTypeAt(1).isBasicType()); + Assert.assertEquals(Integer.class , tti.getTypeAt(1).getTypeClass()); + } + + @Test + public void testFunctionDependingPartialOnInput2() { + RichMapFunction<DoubleValue, ?> function = new OneAppender<DoubleValue>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new ValueTypeInfo<DoubleValue>(DoubleValue.class)); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + + Assert.assertTrue(tti.getTypeAt(0) instanceof ValueTypeInfo<?>); + ValueTypeInfo<?> vti = (ValueTypeInfo<?>) tti.getTypeAt(0); + Assert.assertEquals(DoubleValue.class, vti.getTypeClass()); + + Assert.assertTrue(tti.getTypeAt(1).isBasicType()); + Assert.assertEquals(Integer.class , tti.getTypeAt(1).getTypeClass()); + } + + public class FieldDuplicator<T> extends RichMapFunction<T, Tuple2<T, T>> { + private static final long serialVersionUID = 1L; + + public Tuple2<T, T> map(T value) { + return new Tuple2<T, T>(value, value); + } + } + + @Test + public void testFunctionInputInOutputMultipleTimes() { + RichMapFunction<Float, ?> function = new FieldDuplicator<Float>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.FLOAT_TYPE_INFO); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti.getTypeAt(1)); + } + + @Test + public void testFunctionInputInOutputMultipleTimes2() { + RichMapFunction<Tuple2<Float, Float>, ?> function = new FieldDuplicator<Tuple2<Float, Float>>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new TupleTypeInfo<Tuple2<Float, Float>>( + BasicTypeInfo.FLOAT_TYPE_INFO, BasicTypeInfo.FLOAT_TYPE_INFO)); + + // should be + // Tuple2<Tuple2<Float, Float>, Tuple2<Float, Float>> + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + + // 2nd nested level + Assert.assertTrue(tti.getTypeAt(0).isTupleType()); + TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) tti.getTypeAt(0); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti2.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti2.getTypeAt(1)); + Assert.assertTrue(tti.getTypeAt(0).isTupleType()); + TupleTypeInfo<?> tti3 = (TupleTypeInfo<?>) tti.getTypeAt(1); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti3.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti3.getTypeAt(1)); + } + + public interface Testable {} + + public static abstract class AbstractClassWithoutMember {} + + public static abstract class AbstractClassWithMember { + public int x; + } + + @Test + public void testAbstractAndInterfaceTypes() { + + // interface + RichMapFunction<String, ?> function = new RichMapFunction<String, Testable>() { + private static final long serialVersionUID = 1L; + + @Override + public Testable map(String value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertTrue(ti instanceof GenericTypeInfo); + + // abstract class with out class member + RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClassWithoutMember>() { + private static final long serialVersionUID = 1L; + + @Override + public AbstractClassWithoutMember map(String value) throws Exception { + return null; + } + }; + + ti = TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertTrue(ti instanceof GenericTypeInfo); + + // abstract class with class member + RichMapFunction<String, ?> function3 = new RichMapFunction<String, AbstractClassWithMember>() { + private static final long serialVersionUID = 1L; + + @Override + public AbstractClassWithMember map(String value) throws Exception { + return null; + } + }; + + ti = TypeExtractor.getMapReturnTypes(function3, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertTrue(ti instanceof PojoTypeInfo); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testValueSupertypeException() { + RichMapFunction<?, ?> function = new RichMapFunction<StringValue, Value>() { + private static final long serialVersionUID = 1L; + + @Override + public Value map(StringValue value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti =TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + + try { + TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue")); + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testBasicArray() { + // use getCoGroupReturnTypes() + RichCoGroupFunction<?, ?, ?> function = new RichCoGroupFunction<String[], String[], String[]>() { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<String[]> first, Iterable<String[]> second, Collector<String[]> out) throws Exception { + // nothing to do + } + }; + + TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String[]"), (TypeInformation) TypeInfoParser.parse("String[]")); + + Assert.assertFalse(ti.isBasicType()); + Assert.assertFalse(ti.isTupleType()); + + // Due to a Java 6 bug the classification can be slightly wrong + Assert.assertTrue(ti instanceof BasicArrayTypeInfo<?,?> || ti instanceof ObjectArrayTypeInfo<?,?>); + + if(ti instanceof BasicArrayTypeInfo<?,?>) { + Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, ti); + } + else { + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ((ObjectArrayTypeInfo<?,?>) ti).getComponentInfo()); + } + } + + @Test + public void testBasicArray2() { + RichMapFunction<Boolean[], ?> function = new IdentityMapper<Boolean[]>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO); + + Assert.assertTrue(ti instanceof BasicArrayTypeInfo<?, ?>); + BasicArrayTypeInfo<?, ?> bati = (BasicArrayTypeInfo<?, ?>) ti; + Assert.assertTrue(bati.getComponentInfo().isBasicType()); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, bati.getComponentInfo()); + } + + public static class CustomArrayObject {} + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testCustomArray() { + RichMapFunction<?, ?> function = new RichMapFunction<CustomArrayObject[], CustomArrayObject[]>() { + private static final long serialVersionUID = 1L; + + @Override + public CustomArrayObject[] map(CustomArrayObject[] value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, + (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomArrayObject[]")); + + Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>); + Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testTupleArray() { + RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<String, String>[], Tuple2<String, String>[]>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, String>[] map(Tuple2<String, String>[] value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>[]")); + + Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>); + ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti; + Assert.assertTrue(oati.getComponentInfo().isTupleType()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) oati.getComponentInfo(); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); + } + + public class CustomArrayObject2<F> extends Tuple1<F> { + private static final long serialVersionUID = 1L; + + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testCustomArrayWithTypeVariable() { + RichMapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple1<Boolean>[]")); + + Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>); + ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti; + Assert.assertTrue(oati.getComponentInfo().isTupleType()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) oati.getComponentInfo(); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(0)); + } + + public class GenericArrayClass<T> extends RichMapFunction<T[], T[]> { + private static final long serialVersionUID = 1L; + + @Override + public T[] map(T[] value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testParameterizedArrays() { + GenericArrayClass<Boolean> function = new GenericArrayClass<Boolean>(){ + private static final long serialVersionUID = 1L; + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean[]")); + Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?,?>); + ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti; + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, oati.getComponentInfo()); + } + + public static class MyObject<T> { + public T myField; + } + + public static class InType extends MyObject<String> {} + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testParameterizedPojo() { + RichMapFunction<?, ?> function = new RichMapFunction<InType, MyObject<String>>() { + private static final long serialVersionUID = 1L; + + @Override + public MyObject<String> map(InType value) throws Exception { + return null; + } + }; + TypeInformation<?> inType = TypeExtractor.createTypeInfo(InType.class); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) inType); + Assert.assertTrue(ti instanceof PojoTypeInfo); + } + + @Test + public void testFunctionDependingOnInputWithTupleInputWithTypeMismatch() { + IdentityMapper2<Boolean> function = new IdentityMapper2<Boolean>(); + + TypeInformation<Tuple2<Boolean, String>> inputType = new TupleTypeInfo<Tuple2<Boolean, String>>(BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + // input is: Tuple2<Boolean, Integer> + // allowed: Tuple2<?, String> + + try { + TypeExtractor.getMapReturnTypes(function, inputType); + Assert.fail("exception expected"); + } catch (InvalidTypesException e) { + // right + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testInputMismatchExceptions() { + + RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<String, String>, String>() { + private static final long serialVersionUID = 1L; + + @Override + public String map(Tuple2<String, String> value) throws Exception { + return null; + } + }; + + try { + TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Integer, String>")); + Assert.fail("exception expected"); + } catch (InvalidTypesException e) { + // right + } + + try { + TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, String, String>")); + Assert.fail("exception expected"); + } catch (InvalidTypesException e) { + // right + } + + RichMapFunction<?, ?> function2 = new RichMapFunction<StringValue, String>() { + private static final long serialVersionUID = 1L; + + @Override + public String map(StringValue value) throws Exception { + return null; + } + }; + + try { + TypeExtractor.getMapReturnTypes(function2, (TypeInformation) TypeInfoParser.parse("IntValue")); + Assert.fail("exception expected"); + } catch (InvalidTypesException e) { + // right + } + + RichMapFunction<?, ?> function3 = new RichMapFunction<Tuple1<Integer>[], String>() { + private static final long serialVersionUID = 1L; + + @Override + public String map(Tuple1<Integer>[] value) throws Exception { + return null; + } + }; + + try { + TypeExtractor.getMapReturnTypes(function3, (TypeInformation) TypeInfoParser.parse("Integer[]")); + Assert.fail("exception expected"); + } catch (InvalidTypesException e) { + // right + } + + RichMapFunction<?, ?> function4 = new RichMapFunction<Writable, String>() { + private static final long serialVersionUID = 1L; + + @Override + public String map(Writable value) throws Exception { + return null; + } + }; + + try { + TypeExtractor.getMapReturnTypes(function4, (TypeInformation) new WritableTypeInfo<MyWritable>(MyWritable.class)); + Assert.fail("exception expected"); + } catch (InvalidTypesException e) { + // right + } + } + + public static class DummyFlatMapFunction<A,B,C,D> extends RichFlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple2<A, B> value, Collector<Tuple2<C, D>> out) throws Exception { + + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testTypeErasure() { + TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction<String, Integer, String, Boolean>(), + (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), "name", true); + Assert.assertTrue(ti instanceof MissingTypeInfo); + + try { + TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction<String, Integer, String, Boolean>(), + (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>")); + + Assert.fail("Expected an exception"); + } + catch (InvalidTypesException e) { + // expected + } + } + + public static class MyQueryableMapper<A> extends RichMapFunction<String, A> implements ResultTypeQueryable<A> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unchecked") + @Override + public TypeInformation<A> getProducedType() { + return (TypeInformation<A>) BasicTypeInfo.INT_TYPE_INFO; + } + + @Override + public A map(String value) throws Exception { + return null; + } + } + + @Test + public void testResultTypeQueryable() { + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyQueryableMapper<Integer>(), BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); + } + + @Test + public void testTupleWithPrimitiveArray() { + RichMapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>> function = new RichMapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple9<int[], double[], long[], byte[], char[], float[], short[], boolean[], String[]> map(Integer value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.INT_TYPE_INFO); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(1)); + Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(2)); + Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(3)); + Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(4)); + Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(5)); + Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(6)); + Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(7)); + Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, tti.getTypeAt(8)); + } + + @Test + public void testFunction() { + RichMapFunction<String, Boolean> mapInterface = new RichMapFunction<String, Boolean>() { + + private static final long serialVersionUID = 1L; + + @Override + public void setRuntimeContext(RuntimeContext t) { + + } + + @Override + public void open(Configuration parameters) throws Exception { + } + + @Override + public RuntimeContext getRuntimeContext() { + return null; + } + + @Override + public void close() throws Exception { + + } + + @Override + public Boolean map(String record) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); + } + + @Test + public void testInterface() { + MapFunction<String, Boolean> mapInterface = new MapFunction<String, Boolean>() { + private static final long serialVersionUID = 1L; + + @Override + public Boolean map(String record) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); + } + + @Test + public void testCreateTypeInfoFromInstance() { + ResultTypeQueryable instance = new ResultTypeQueryable<Long>() { + @Override + public TypeInformation<Long> getProducedType() { + return BasicTypeInfo.LONG_TYPE_INFO; + } + }; + TypeInformation<?> ti = TypeExtractor.createTypeInfo(instance, null, null, 0); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ti); + + // method also needs to work for instances that do not implement ResultTypeQueryable + MapFunction<Integer, Long> func = new MapFunction<Integer, Long>() { + @Override + public Long map(Integer value) throws Exception { + return value.longValue(); + } + }; + ti = TypeExtractor.createTypeInfo(func, MapFunction.class, func.getClass(), 0); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); + } + + @SuppressWarnings({ "serial", "unchecked", "rawtypes" }) + @Test + public void testExtractKeySelector() { + KeySelector<String, Integer> selector = new KeySelector<String, Integer>() { + @Override + public Integer getKey(String value) { return null; } + }; + + TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(selector, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); + + try { + TypeExtractor.getKeySelectorTypes((KeySelector) selector, BasicTypeInfo.BOOLEAN_TYPE_INFO); + Assert.fail(); + } + catch (InvalidTypesException e) { + // good + } + catch (Exception e) { + Assert.fail("wrong exception type"); + } + } + + public static class DuplicateValue<T> implements MapFunction<Tuple1<T>, Tuple2<T, T>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<T, T> map(Tuple1<T> vertex) { + return new Tuple2<T, T>(vertex.f0, vertex.f0); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testDuplicateValue() { + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValue<String>(), TypeInfoParser.parse("Tuple1<String>")); + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); + } + + public static class DuplicateValueNested<T> implements MapFunction<Tuple1<Tuple1<T>>, Tuple2<T, T>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<T, T> map(Tuple1<Tuple1<T>> vertex) { + return new Tuple2<T, T>(null, null); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testDuplicateValueNested() { + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValueNested<String>(), TypeInfoParser.parse("Tuple1<Tuple1<String>>")); + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(2, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); + } + + public static class Edge<K, V> extends Tuple3<K, K, V> { + private static final long serialVersionUID = 1L; + + } + + public static class EdgeMapper<K, V> implements MapFunction<Edge<K, V>, Edge<K, V>> { + private static final long serialVersionUID = 1L; + + @Override + public Edge<K, V> map(Edge<K, V> value) throws Exception { + return null; + } + + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInputInference1() { + EdgeMapper<String, Double> em = new EdgeMapper<String, Double>(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<String, String, Double>")); + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(3, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tti.getTypeAt(2)); + } + + public static class EdgeMapper2<V> implements MapFunction<V, Edge<Long, V>> { + private static final long serialVersionUID = 1L; + + @Override + public Edge<Long, V> map(V value) throws Exception { + return null; + } + + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInputInference2() { + EdgeMapper2<Boolean> em = new EdgeMapper2<Boolean>(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Boolean")); + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(3, ti.getArity()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1)); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(2)); + } + + public static class EdgeMapper3<K, V> implements MapFunction<Edge<K, V>, V> { + private static final long serialVersionUID = 1L; + + @Override + public V map(Edge<K, V> value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInputInference3() { + EdgeMapper3<Boolean, String> em = new EdgeMapper3<Boolean, String>(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<Boolean,Boolean,String>")); + Assert.assertTrue(ti.isBasicType()); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); + } + + public static class EdgeMapper4<K, V> implements MapFunction<Edge<K, V>[], V> { + private static final long serialVersionUID = 1L; + + @Override + public V map(Edge<K, V>[] value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInputInference4() { + EdgeMapper4<Boolean, String> em = new EdgeMapper4<Boolean, String>(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<Boolean,Boolean,String>[]")); + Assert.assertTrue(ti.isBasicType()); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); + } + + public static enum MyEnum { + ONE, TWO, THREE + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testEnumType() { + MapFunction<?, ?> mf = new MapFunction<MyEnum, MyEnum>() { + private static final long serialVersionUID = 1L; + + @Override + public MyEnum map(MyEnum value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) mf, new EnumTypeInfo(MyEnum.class)); + Assert.assertTrue(ti instanceof EnumTypeInfo); + Assert.assertEquals(ti.getTypeClass(), MyEnum.class); + } + + public static class MapperWithMultiDimGenericArray<T> implements MapFunction<T[][][], Tuple1<T>[][][]> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple1<T>[][][] map(T[][][] value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testMultiDimensionalArray() { + // tuple array + MapFunction<?,?> function = new MapFunction<Tuple2<Integer, Double>[][], Tuple2<Integer, Double>[][]>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Double>[][] map( + Tuple2<Integer, Double>[][] value) throws Exception { + return null; + } + }; + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("Tuple2<Integer, Double>[][]")); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple2<Integer, Double>>>", ti.toString()); + + // primitive array + function = new MapFunction<int[][][], int[][][]>() { + private static final long serialVersionUID = 1L; + + @Override + public int[][][] map( + int[][][] value) throws Exception { + return null; + } + }; + ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("int[][][]")); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<int[]>>", ti.toString()); + + // basic array + function = new MapFunction<Integer[][][], Integer[][][]>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer[][][] map( + Integer[][][] value) throws Exception { + return null; + } + }; + ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("Integer[][][]")); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<Integer>>>", ti.toString()); + + // pojo array + function = new MapFunction<CustomType[][][], CustomType[][][]>() { + private static final long serialVersionUID = 1L; + + @Override + public CustomType[][][] map( + CustomType[][][] value) throws Exception { + return null; + } + }; + ti = TypeExtractor.getMapReturnTypes((MapFunction)function, + TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType<" + + "myField1=String,myField2=int" + + ">[][][]")); + + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<" + + "PojoType<org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType, fields = [myField1: String, myField2: Integer]>" + + ">>>", ti.toString()); + + // generic array + ti = TypeExtractor.getMapReturnTypes((MapFunction) new MapperWithMultiDimGenericArray<String>(), TypeInfoParser.parse("String[][][]")); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple1<String>>>>", ti.toString()); + } + + @SuppressWarnings("rawtypes") + public static class MapWithResultTypeQueryable implements MapFunction, ResultTypeQueryable { + private static final long serialVersionUID = 1L; + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + + @Override + public Object map(Object value) throws Exception { + return null; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInputMismatchWithRawFuntion() { + MapFunction<?, ?> function = new MapWithResultTypeQueryable(); + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, BasicTypeInfo.INT_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); + } + + public static class Either1<T> extends Either<String, T> { + @Override + public String left() throws IllegalStateException { + return null; + } + + @Override + public T right() throws IllegalStateException { + return null; + } + } + + public static class Either2 extends Either1<Tuple1<Integer>> { + // nothing to do here + } + + public static class EitherMapper<T> implements MapFunction<T, Either1<T>> { + @Override + public Either1<T> map(T value) throws Exception { + return null; + } + } + + public static class EitherMapper2 implements MapFunction<String, Either2> { + @Override + public Either2 map(String value) throws Exception { + return null; + } + } + + public static class EitherMapper3 implements MapFunction<Either2, Either2> { + @Override + public Either2 map(Either2 value) throws Exception { + return null; + } + } + + @Test + public void testEither() { + MapFunction<?, ?> function = new MapFunction<Either<String, Boolean>, Either<String, Boolean>>() { + @Override + public Either<String, Boolean> map(Either<String, Boolean> value) throws Exception { + return null; + } + }; + TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected); + Assert.assertEquals(expected, ti); + } + + @Test + public void testEitherHierarchy() { + MapFunction<?, ?> function = new EitherMapper<Boolean>(); + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.BOOLEAN_TYPE_INFO); + TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); + Assert.assertEquals(expected, ti); + + function = new EitherMapper2(); + ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.STRING_TYPE_INFO); + expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO)); + Assert.assertEquals(expected, ti); + + function = new EitherMapper3(); + ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected); + Assert.assertEquals(expected, ti); + + Either<String, Tuple1<Integer>> either = new Either2(); + ti = TypeExtractor.getForObject(either); + Assert.assertEquals(expected, ti); + } + + @Test(expected=InvalidTypesException.class) + public void testEitherFromObjectException() { + Either<String, Tuple1<Integer>> either = Either.Left("test"); + TypeExtractor.getForObject(either); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java new file mode 100644 index 0000000..e225460 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.junit.Assert; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.types.BooleanValue; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.CharValue; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.ListValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.MapValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.types.ShortValue; +import org.apache.flink.types.StringValue; +import org.apache.hadoop.io.Writable; +import org.junit.Test; + +public class TypeInfoParserTest { + + @Test + public void testBasicTypes() { + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("Integer")); + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("Double")); + Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("Byte")); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("Float")); + Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("Short")); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("Long")); + Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("Character")); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInfoParser.parse("String")); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("Boolean")); + Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("Void")); + Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, TypeInfoParser.parse("Date")); + + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("java.lang.Integer")); + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("java.lang.Double")); + Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("java.lang.Byte")); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("java.lang.Float")); + Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("java.lang.Short")); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("java.lang.Long")); + Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("java.lang.Character")); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInfoParser.parse("java.lang.String")); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("java.lang.Boolean")); + Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("java.lang.Void")); + Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, TypeInfoParser.parse("java.util.Date")); + + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("int")); + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("double")); + Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("byte")); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("float")); + Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("short")); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("long")); + Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("char")); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("boolean")); + Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("void")); + } + + @Test + public void testValueTypes() { + helperValueType("StringValue", StringValue.class); + helperValueType("IntValue", IntValue.class); + helperValueType("ByteValue", ByteValue.class); + helperValueType("ShortValue", ShortValue.class); + helperValueType("CharValue", CharValue.class); + helperValueType("DoubleValue", DoubleValue.class); + helperValueType("FloatValue", FloatValue.class); + helperValueType("LongValue", LongValue.class); + helperValueType("BooleanValue", BooleanValue.class); + helperValueType("ListValue", ListValue.class); + helperValueType("MapValue", MapValue.class); + helperValueType("NullValue", NullValue.class); + } + + private static void helperValueType(String str, Class<?> clazz) { + TypeInformation<?> ti = TypeInfoParser.parse(str); + Assert.assertTrue(ti instanceof ValueTypeInfo); + ValueTypeInfo<?> vti = (ValueTypeInfo<?>) ti; + Assert.assertEquals(clazz, vti.getTypeClass()); + } + + @Test + public void testBasicArrays() { + Assert.assertEquals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Integer[]")); + Assert.assertEquals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, TypeInfoParser.parse("Double[]")); + Assert.assertEquals(BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO, TypeInfoParser.parse("Byte[]")); + Assert.assertEquals(BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Float[]")); + Assert.assertEquals(BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Short[]")); + Assert.assertEquals(BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, TypeInfoParser.parse("Long[]")); + Assert.assertEquals(BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO, TypeInfoParser.parse("Character[]")); + Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, TypeInfoParser.parse("String[]")); + Assert.assertEquals(BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO, TypeInfoParser.parse("Boolean[]")); + + Assert.assertEquals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Integer[]")); + Assert.assertEquals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Double[]")); + Assert.assertEquals(BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Byte[]")); + Assert.assertEquals(BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Float[]")); + Assert.assertEquals(BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Short[]")); + Assert.assertEquals(BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Long[]")); + Assert.assertEquals(BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Character[]")); + Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.String[]")); + Assert.assertEquals(BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Boolean[]")); + + Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("int[]")); + Assert.assertEquals(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("double[]")); + Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("byte[]")); + Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("float[]")); + Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("short[]")); + Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("long[]")); + Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("char[]")); + Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("boolean[]")); + } + + @Test + public void testTuples() { + TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<Integer, Long>"); + Assert.assertEquals(2, ti.getArity()); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(1)); + + ti = TypeInfoParser.parse("Tuple0"); + Assert.assertEquals(0, ti.getArity()); + Assert.assertEquals("Java Tuple0", ti.toString()); + + ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple0"); + Assert.assertEquals(0, ti.getArity()); + Assert.assertEquals("Java Tuple0", ti.toString()); + + ti = TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>"); + Assert.assertEquals("Java Tuple3<Java Tuple1<String>, Java Tuple1<Integer>, Java Tuple2<Long, Long>>", ti.toString()); + } + + @Test + public void testGenericType() { + TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class"); + Assert.assertTrue(ti instanceof GenericTypeInfo); + Assert.assertEquals(Class.class, ((GenericTypeInfo<?>) ti).getTypeClass()); + } + + public static class MyPojo { + public Integer basic; + public Tuple2<String, Integer> tuple; + public MyWritable hadoopCitizen; + public String[] array; + } + + @Test + public void testPojoType() { + TypeInformation<?> ti = TypeInfoParser.parse( + "org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<" + + "basic=Integer," + + "tuple=Tuple2<String, Integer>," + + "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>," + + "array=String[]" + + ">"); + Assert.assertTrue(ti instanceof PojoTypeInfo); + PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; + Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo); + Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo); + Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo); + Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo); + } + + @Test + public void testPojoType2() { + TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<String,Tuple2<Integer,org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>>>"); + Assert.assertTrue(ti instanceof TupleTypeInfo); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertTrue(tti.getTypeAt(0) instanceof BasicTypeInfo); + Assert.assertTrue(tti.getTypeAt(1) instanceof TupleTypeInfo); + TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>)(Object)tti.getTypeAt(1); + Assert.assertTrue(tti2.getTypeAt(0) instanceof BasicTypeInfo); + Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo); + PojoTypeInfo<?> pti = (PojoTypeInfo<?>) tti2.getTypeAt(1); + Assert.assertEquals("basic", pti.getPojoFieldAt(0).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicTypeInfo); + } + + public static class MyWritable implements Writable { + + @Override + public void write(DataOutput out) throws IOException { + + } + + @Override + public void readFields(DataInput in) throws IOException { + + } + + } + + @Test + public void testWritableType() { + TypeInformation<?> ti = TypeInfoParser.parse("Writable<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>"); + Assert.assertTrue(ti instanceof WritableTypeInfo<?>); + Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass()); + } + + @Test + public void testObjectArrays() { + TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class[]"); + + Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>); + Assert.assertEquals(Class.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass()); + + TypeInformation<?> ti2 = TypeInfoParser.parse("Tuple2<Integer,Double>[]"); + + Assert.assertTrue(ti2 instanceof ObjectArrayTypeInfo<?, ?>); + Assert.assertTrue(((ObjectArrayTypeInfo<?, ?>) ti2).getComponentInfo() instanceof TupleTypeInfo); + + TypeInformation<?> ti3 = TypeInfoParser.parse("Tuple2<Integer[],Double>[]"); + Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString()); + } + + @Test + public void testLargeMixedTuple() { + TypeInformation<?> ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple4<Double,java.lang.Class[],StringValue,Tuple1<int>>[]"); + Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<StringValue>, Java Tuple1<Integer>>>", ti.toString()); + } + + public static enum MyEnum { + ONE, TWO, THREE + } + + @Test + public void testEnumType() { + TypeInformation<?> ti = TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>"); + Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>", ti.toString()); + + TypeInformation<?> ti2 = TypeInfoParser.parse("java.lang.Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>"); + Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>", ti2.toString()); + } + + @Test + public void testException() { + try { + TypeInfoParser.parse("THIS_CLASS_DOES_NOT_EXIST"); + Assert.fail("exception expected"); + } catch (IllegalArgumentException e) { + // right + } + + try { + TypeInfoParser.parse("Tuple2<Integer>"); + Assert.fail("exception expected"); + } catch (IllegalArgumentException e) { + // right + } + + try { + TypeInfoParser.parse("Tuple3<Integer,,>"); + Assert.fail("exception expected"); + } catch (IllegalArgumentException e) { + // right + } + + try { + TypeInfoParser.parse("Tuple1<Integer,Double>"); + Assert.fail("exception expected"); + } catch (IllegalArgumentException e) { + // right + } + } + + @Test + public void testMultiDimensionalArray() { + // tuple + TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<Integer, Double>[][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple2<Integer, Double>>>", ti.toString()); + + // pojos + ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<" + + "PojoType<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo, fields = [basic: String]>" + + ">>>", ti.toString()); + + // basic types + ti = TypeInfoParser.parse("Float[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<Float>>>", ti.toString()); + ti = TypeInfoParser.parse("String[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<String>>>", ti.toString()); + ti = TypeInfoParser.parse("Date[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<Date>>>", ti.toString()); + + // primitive types + ti = TypeInfoParser.parse("int[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<int[]>>", ti.toString()); + ti = TypeInfoParser.parse("boolean[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<boolean[]>>", ti.toString()); + + // value types + ti = TypeInfoParser.parse("IntValue[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<ValueType<IntValue>>>>", ti.toString()); + + // writable types + ti = TypeInfoParser.parse("Writable<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<" + + "WritableType<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>" + + ">>>", ti.toString()); + + // enum types + ti = TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>[][][]"); + Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<" + + "EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>" + + ">>>", ti.toString()); + } +}