http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java new file mode 100644 index 0000000..30710e5 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.*; +import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer; +//CHECKSTYLE.ON: AvoidStarImport +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.types.Value; + +/** + * A {@link TypeInformation} for the tuple types of the Java API. + * + * @param <T> The type of the tuple. + */ +public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { + + private static final long serialVersionUID = 1L; + + protected final String[] fieldNames; + + @SuppressWarnings("unchecked") + public TupleTypeInfo(TypeInformation<?>... types) { + this((Class<T>) Tuple.getTupleClass(types.length), types); + } + + public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) { + super(tupleType, types); + + Preconditions.checkArgument( + types.length <= Tuple.MAX_ARITY, + "The tuple type exceeds the maximum supported arity."); + + this.fieldNames = new String[types.length]; + + for (int i = 0; i < types.length; i++) { + fieldNames[i] = "f" + i; + } + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public int getFieldIndex(String fieldName) { + int fieldIndex = Integer.parseInt(fieldName.substring(1)); + if (fieldIndex >= getArity()) { + return -1; + } + return fieldIndex; + } + + @SuppressWarnings("unchecked") + @Override + public TupleSerializer<T> createSerializer(ExecutionConfig executionConfig) { + if (getTypeClass() == Tuple0.class) { + return (TupleSerializer<T>) Tuple0Serializer.INSTANCE; + } + + TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[getArity()]; + for (int i = 0; i < types.length; i++) { + fieldSerializers[i] = types[i].createSerializer(executionConfig); + } + + Class<T> tupleClass = getTypeClass(); + + return new TupleSerializer<T>(tupleClass, fieldSerializers); + } + + @Override + protected TypeComparatorBuilder<T> createTypeComparatorBuilder() { + return new TupleTypeComparatorBuilder(); + } + + private class TupleTypeComparatorBuilder implements TypeComparatorBuilder<T> { + + private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>(); + private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>(); + + @Override + public void initializeTypeComparatorBuilder(int size) { + fieldComparators.ensureCapacity(size); + logicalKeyFields.ensureCapacity(size); + } + + @Override + public void addComparatorField(int fieldId, TypeComparator<?> comparator) { + fieldComparators.add(comparator); + logicalKeyFields.add(fieldId); + } + + @Override + public TypeComparator<T> createTypeComparator(ExecutionConfig config) { + Preconditions.checkState( + fieldComparators.size() > 0, + "No field comparators were defined for the TupleTypeComparatorBuilder." + ); + + Preconditions.checkState( + logicalKeyFields.size() > 0, + "No key fields were defined for the TupleTypeComparatorBuilder." + ); + + Preconditions.checkState( + fieldComparators.size() == logicalKeyFields.size(), + "The number of field comparators and key fields is not equal." + ); + + final int maxKey = Collections.max(logicalKeyFields); + + Preconditions.checkState( + maxKey >= 0, + "The maximum key field must be greater or equal than 0." + ); + + TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1]; + + for (int i = 0; i <= maxKey; i++) { + fieldSerializers[i] = types[i].createSerializer(config); + } + + return new TupleComparator<T>( + Ints.toArray(logicalKeyFields), + fieldComparators.toArray(new TypeComparator[fieldComparators.size()]), + fieldSerializers + ); + } + } + + // -------------------------------------------------------------------------------------------- + + @Override + public boolean equals(Object obj) { + if (obj instanceof TupleTypeInfo) { + @SuppressWarnings("unchecked") + TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj; + return other.canEqual(this) && + super.equals(other) && + Arrays.equals(fieldNames, other.fieldNames); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TupleTypeInfo; + } + + @Override + public int hashCode() { + return 31 * super.hashCode() + Arrays.hashCode(fieldNames); + } + + @Override + public String toString() { + return "Java " + super.toString(); + } + + // -------------------------------------------------------------------------------------------- + + public static <X extends Tuple> TupleTypeInfo<X> getBasicTupleTypeInfo(Class<?>... basicTypes) { + if (basicTypes == null || basicTypes.length == 0) { + throw new IllegalArgumentException(); + } + + TypeInformation<?>[] infos = new TypeInformation<?>[basicTypes.length]; + for (int i = 0; i < infos.length; i++) { + Class<?> type = basicTypes[i]; + if (type == null) { + throw new IllegalArgumentException("Type at position " + i + " is null."); + } + + TypeInformation<?> info = BasicTypeInfo.getInfoFor(type); + if (info == null) { + throw new IllegalArgumentException("Type at position " + i + " is not a basic type."); + } + infos[i] = info; + } + + @SuppressWarnings("unchecked") + TupleTypeInfo<X> tupleInfo = (TupleTypeInfo<X>) new TupleTypeInfo<Tuple>(infos); + return tupleInfo; + } + + @SuppressWarnings("unchecked") + public static <X extends Tuple> TupleTypeInfo<X> getBasicAndBasicValueTupleTypeInfo(Class<?>... basicTypes) { + if (basicTypes == null || basicTypes.length == 0) { + throw new IllegalArgumentException(); + } + + TypeInformation<?>[] infos = new TypeInformation<?>[basicTypes.length]; + for (int i = 0; i < infos.length; i++) { + Class<?> type = basicTypes[i]; + if (type == null) { + throw new IllegalArgumentException("Type at position " + i + " is null."); + } + + TypeInformation<?> info = BasicTypeInfo.getInfoFor(type); + if (info == null) { + try { + info = ValueTypeInfo.getValueTypeInfo((Class<Value>) type); + if (!((ValueTypeInfo<?>) info).isBasicValueType()) { + throw new IllegalArgumentException("Type at position " + i + " is not a basic or value type."); + } + } catch (ClassCastException | InvalidTypesException e) { + throw new IllegalArgumentException("Type at position " + i + " is not a basic or value type.", e); + } + } + infos[i] = info; + } + + + return (TupleTypeInfo<X>) new TupleTypeInfo<>(infos); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java new file mode 100644 index 0000000..753eb66 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.base.Preconditions; + +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; + +public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { + + private static final long serialVersionUID = 1L; + + private final static String REGEX_FIELD = "(f?)([0-9]+)"; + private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; + private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + +"|\\"+ ExpressionKeys.SELECT_ALL_CHAR + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; + + private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_FIELD); + private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); + private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); + + // -------------------------------------------------------------------------------------------- + + protected final TypeInformation<?>[] types; + + private final int totalFields; + + public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) { + super(tupleType); + + this.types = Preconditions.checkNotNull(types); + + int fieldCounter = 0; + + for(TypeInformation<?> type : types) { + fieldCounter += type.getTotalFields(); + } + + totalFields = fieldCounter; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return true; + } + + public boolean isCaseClass() { + return false; + } + + @Override + public int getArity() { + return types.length; + } + + @Override + public int getTotalFields() { + return totalFields; + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + if (!matcher.matches()) { + throw new InvalidFieldReferenceException("Invalid tuple field reference \""+fieldExpression+"\"."); + } + + String field = matcher.group(0); + if (field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> type : types) { + if (type instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) type; + cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, type)); + } + keyPosition++; + } + } else { + String fieldStr = matcher.group(1); + Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); + + if (!fieldMatcher.matches()) { + throw new RuntimeException("Invalid matcher pattern"); + } + + field = fieldMatcher.group(2); + int fieldPos = Integer.valueOf(field); + + if (fieldPos >= this.getArity()) { + throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + "."); + } + TypeInformation<?> fieldType = this.getTypeAt(fieldPos); + String tail = matcher.group(5); + if (tail == null) { + if (fieldType instanceof CompositeType) { + // forward offsets + for (int i = 0; i < fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + // add all fields of composite type + ((CompositeType<?>) fieldType).getFlatFields("*", offset, result); + } else { + // we found the field to add + // compute flat field position by adding skipped fields + int flatFieldPos = offset; + for (int i = 0; i < fieldPos; i++) { + flatFieldPos += this.getTypeAt(i).getTotalFields(); + } + result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); + } + } else { + if (fieldType instanceof CompositeType<?>) { + // forward offset + for (int i = 0; i < fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException("Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if(!matcher.matches()) { + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of tuple field expression \""+fieldExpression+"\"."); + } + } + + String fieldStr = matcher.group(1); + Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); + if(!fieldMatcher.matches()) { + throw new RuntimeException("Invalid matcher pattern"); + } + String field = fieldMatcher.group(2); + int fieldPos = Integer.valueOf(field); + + if(fieldPos >= this.getArity()) { + throw new InvalidFieldReferenceException("Tuple field expression \""+fieldStr+"\" out of bounds of "+this.toString()+"."); + } + TypeInformation<X> fieldType = this.getTypeAt(fieldPos); + String tail = matcher.group(5); + if(tail == null) { + // we found the type + return fieldType; + } else { + if(fieldType instanceof CompositeType<?>) { + return ((CompositeType<?>) fieldType).getTypeAt(tail); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(int pos) { + if (pos < 0 || pos >= this.types.length) { + throw new IndexOutOfBoundsException(); + } + + @SuppressWarnings("unchecked") + TypeInformation<X> typed = (TypeInformation<X>) this.types[pos]; + return typed; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TupleTypeInfoBase) { + @SuppressWarnings("unchecked") + TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj; + + return other.canEqual(this) && + super.equals(other) && + Arrays.equals(types, other.types) && + totalFields == other.totalFields; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TupleTypeInfoBase; + } + + @Override + public int hashCode() { + return 31 * (31 * super.hashCode() + Arrays.hashCode(types)) + totalFields; + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder("Tuple"); + bld.append(types.length); + if (types.length > 0) { + bld.append('<').append(types[0]); + + for (int i = 1; i < types.length; i++) { + bld.append(", ").append(types[i]); + } + + bld.append('>'); + } + return bld.toString(); + } + + @Override + public boolean hasDeterministicFieldOrder() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java new file mode 100644 index 0000000..d4ea24c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -0,0 +1,1692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.specific.SpecificRecordBase; + +import org.apache.commons.lang3.ClassUtils; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.types.Either; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; + +import org.apache.hadoop.io.Writable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A utility for reflection analysis on classes, to determine the return type of implementations of transformation + * functions. + */ +public class TypeExtractor { + + /* + * NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy". + * The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate + * types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type + * (depends on the method, e.g. MyPojoFieldType). + * + * Thus, it fully qualifies types until tuple/POJO field level. + * + * A typical typeHierarchy could look like: + * + * UDF: MyMapFunction.class + * top-level UDF: MyMapFunctionBase.class + * RichMapFunction: RichMapFunction.class + * MapFunction: MapFunction.class + * Function's OUT: Tuple1<MyPojo> + * user-defined POJO: MyPojo.class + * user-defined top-level POJO: MyPojoBase.class + * POJO field: Tuple1<String> + * Field type: String.class + * + */ + + private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); + + protected TypeExtractor() { + // only create instances for special use cases + } + + // -------------------------------------------------------------------------------------------- + // Function specific methods + // -------------------------------------------------------------------------------------------- + + public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) { + return getMapReturnTypes(mapInterface, inType, null, false); + } + + public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType, functionName, allowMissing); + } + + + public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) { + return getFlatMapReturnTypes(flatMapInterface, inType, null, false); + } + + public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing); + } + + public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType) + { + return getFoldReturnTypes(foldInterface, inType, null, false); + } + + public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing); + } + + + public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType) { + return getMapPartitionReturnTypes(mapPartitionInterface, inType, null, false); + } + + public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType, functionName, allowMissing); + } + + + public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType) { + return getGroupReduceReturnTypes(groupReduceInterface, inType, null, false); + } + + public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing); + } + + public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) { + return getGroupCombineReturnTypes(combineInterface, inType, null, false); + } + + public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType, + String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing); + } + + + public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) + { + return getFlatJoinReturnTypes(joinInterface, in1Type, in2Type, null, false); + } + + public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) + { + return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true, + in1Type, in2Type, functionName, allowMissing); + } + + + public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) + { + return getJoinReturnTypes(joinInterface, in1Type, in2Type, null, false); + } + + public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) + { + return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false, + in1Type, in2Type, functionName, allowMissing); + } + + + public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) + { + return getCoGroupReturnTypes(coGroupInterface, in1Type, in2Type, null, false); + } + + public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) + { + return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true, + in1Type, in2Type, functionName, allowMissing); + } + + + public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) + { + return getCrossReturnTypes(crossInterface, in1Type, in2Type, null, false); + } + + public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) + { + return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, + in1Type, in2Type, functionName, allowMissing); + } + + + public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> inType) { + return getKeySelectorTypes(selectorInterface, inType, null, false); + } + + public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, + TypeInformation<IN> inType, String functionName, boolean allowMissing) + { + return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType, functionName, allowMissing); + } + + + public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner) { + return getPartitionerTypes(partitioner, null, false); + } + + public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner, String functionName, boolean allowMissing) { + return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null); + } + + + @SuppressWarnings("unchecked") + public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) { + if (inputFormatInterface instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable<IN>) inputFormatInterface).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo(InputFormat.class, inputFormatInterface.getClass(), 0, null, null); + } + + // -------------------------------------------------------------------------------------------- + // Generic extraction methods + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass, + boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType, + String functionName, boolean allowMissing) + { + try { + final Method m = FunctionUtils.checkAndExtractLambdaMethod(function); + if (m != null) { + // check for lambda type erasure + validateLambdaGenericParameters(m); + + // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function + final int paramLen = m.getGenericParameterTypes().length - 1; + final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType); + if(function instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable<OUT>) function).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null); + } + else { + validateInputType(baseClass, function.getClass(), 0, inType); + if(function instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable<OUT>) function).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null); + } + } + catch (InvalidTypesException e) { + if (allowMissing) { + return (TypeInformation<OUT>) new MissingTypeInfo(functionName != null ? functionName : function.toString(), e); + } else { + throw e; + } + } + } + + @SuppressWarnings("unchecked") + public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass, + boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, + String functionName, boolean allowMissing) + { + try { + final Method m = FunctionUtils.checkAndExtractLambdaMethod(function); + if (m != null) { + // check for lambda type erasure + validateLambdaGenericParameters(m); + + // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function + final int paramLen = m.getGenericParameterTypes().length - 1; + final Type input1 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1]; + final Type input2 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + validateInputType((hasIterables)? removeGenericWrapper(input1) : input1, in1Type); + validateInputType((hasIterables)? removeGenericWrapper(input2) : input2, in2Type); + if(function instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable<OUT>) function).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), in1Type, in2Type); + } + else { + validateInputType(baseClass, function.getClass(), 0, in1Type); + validateInputType(baseClass, function.getClass(), 1, in2Type); + if(function instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable<OUT>) function).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type); + } + } + catch (InvalidTypesException e) { + if (allowMissing) { + return (TypeInformation<OUT>) new MissingTypeInfo(functionName != null ? functionName : function.toString(), e); + } else { + throw e; + } + } + } + + // -------------------------------------------------------------------------------------------- + // Create type information + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + public static <T> TypeInformation<T> createTypeInfo(Class<T> type) { + return (TypeInformation<T>) createTypeInfo((Type) type); + } + + public static TypeInformation<?> createTypeInfo(Type t) { + TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t); + if (ti == null) { + throw new InvalidTypesException("Could not extract type information."); + } + return ti; + } + + /** + * Creates a {@link TypeInformation} from the given parameters. + * + * If the given {@code instance} implements {@link ResultTypeQueryable}, its information + * is used to determine the type information. Otherwise, the type information is derived + * based on the given class information. + * + * @param instance instance to determine type information for + * @param baseClass base class of {@code instance} + * @param clazz class of {@code instance} + * @param returnParamPos index of the return type in the type arguments of {@code clazz} + * @param <OUT> output type + * @return type information + */ + @SuppressWarnings("unchecked") + public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz, int returnParamPos) { + if (instance instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable<OUT>) instance).getProducedType(); + } else { + return createTypeInfo(baseClass, clazz, returnParamPos, null, null); + } + } + + public static <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + TypeInformation<OUT> ti = new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type); + if (ti == null) { + throw new InvalidTypesException("Could not extract type information."); + } + return ti; + } + + // ----------------------------------- private methods ---------------------------------------- + + private TypeInformation<?> privateCreateTypeInfo(Type t) { + ArrayList<Type> typeHierarchy = new ArrayList<Type>(); + typeHierarchy.add(t); + return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null); + } + + // for (Rich)Functions + @SuppressWarnings("unchecked") + private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + ArrayList<Type> typeHierarchy = new ArrayList<Type>(); + Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos); + + TypeInformation<OUT> typeInfo; + + // return type is a variable -> try to get the type info from the input directly + if (returnType instanceof TypeVariable<?>) { + typeInfo = (TypeInformation<OUT>) createTypeInfoFromInputs((TypeVariable<?>) returnType, typeHierarchy, in1Type, in2Type); + + if (typeInfo != null) { + return typeInfo; + } + } + + // get info from hierarchy + return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); + } + + // for LambdaFunctions + @SuppressWarnings("unchecked") + private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + ArrayList<Type> typeHierarchy = new ArrayList<Type>(); + + // get info from hierarchy + return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + + // check if type is a subclass of tuple + if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) { + Type curT = t; + + // do not allow usage of Tuple as type + if (typeToClass(t).equals(Tuple.class)) { + throw new InvalidTypesException( + "Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead."); + } + + // go up the hierarchy until we reach immediate child of Tuple (with or without generics) + // collect the types while moving up for a later top-down + while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) { + typeHierarchy.add(curT); + curT = typeToClass(curT).getGenericSuperclass(); + } + + if(curT == Tuple0.class) { + return new TupleTypeInfo(Tuple0.class, new TypeInformation<?>[0]); + } + + // check if immediate child of Tuple has generics + if (curT instanceof Class<?>) { + throw new InvalidTypesException("Tuple needs to be parameterized by using generics."); + } + + typeHierarchy.add(curT); + + // create the type information for the subtypes + TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type); + // type needs to be treated a pojo due to additional fields + if (subTypesInfo == null) { + if (t instanceof ParameterizedType) { + return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type); + } + else { + return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type); + } + } + // return tuple info + return new TupleTypeInfo(typeToClass(t), subTypesInfo); + + } + // check if type is a subclass of Either + else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) { + Type curT = t; + + // go up the hierarchy until we reach Either (with or without generics) + // collect the types while moving up for a later top-down + while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) { + typeHierarchy.add(curT); + curT = typeToClass(curT).getGenericSuperclass(); + } + + // check if Either has generics + if (curT instanceof Class<?>) { + throw new InvalidTypesException("Either needs to be parameterized by using generics."); + } + + typeHierarchy.add(curT); + + // create the type information for the subtypes + TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type); + // type needs to be treated a pojo due to additional fields + if (subTypesInfo == null) { + if (t instanceof ParameterizedType) { + return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type); + } + else { + return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type); + } + } + // return either info + return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]); + } + // type depends on another type + // e.g. class MyMapper<E> extends MapFunction<String, E> + else if (t instanceof TypeVariable) { + Type typeVar = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) t); + + if (!(typeVar instanceof TypeVariable)) { + return createTypeInfoWithTypeHierarchy(typeHierarchy, typeVar, in1Type, in2Type); + } + // try to derive the type info of the TypeVariable from the immediate base child input as a last attempt + else { + TypeInformation<OUT> typeInfo = (TypeInformation<OUT>) createTypeInfoFromInputs((TypeVariable<?>) t, typeHierarchy, in1Type, in2Type); + if (typeInfo != null) { + return typeInfo; + } else { + throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) t).getName() + "' in '" + + ((TypeVariable<?>) t).getGenericDeclaration() + "' could not be determined. This is most likely a type erasure problem. " + + "The type extraction currently supports types with generic variables only in cases where " + + "all variables in the return type can be deduced from the input type(s)."); + } + } + } + // arrays with generics + else if (t instanceof GenericArrayType) { + GenericArrayType genericArray = (GenericArrayType) t; + + Type componentType = genericArray.getGenericComponentType(); + + // due to a Java 6 bug, it is possible that the JVM classifies e.g. String[] or int[] as GenericArrayType instead of Class + if (componentType instanceof Class) { + + Class<?> componentClass = (Class<?>) componentType; + String className; + // for int[], double[] etc. + if(componentClass.isPrimitive()) { + className = encodePrimitiveClass(componentClass); + } + // for String[], Integer[] etc. + else { + className = "L" + componentClass.getName() + ";"; + } + + Class<OUT> classArray; + try { + classArray = (Class<OUT>) Class.forName("[" + className); + } catch (ClassNotFoundException e) { + throw new InvalidTypesException("Could not convert GenericArrayType to Class."); + } + + return getForClass(classArray); + } else { + TypeInformation<?> componentInfo = createTypeInfoWithTypeHierarchy( + typeHierarchy, + genericArray.getGenericComponentType(), + in1Type, + in2Type); + + Class<OUT> classArray; + + try { + String componentClassName = componentInfo.getTypeClass().getName(); + String resultingClassName; + + if (componentClassName.startsWith("[")) { + resultingClassName = "[" + componentClassName; + } else { + resultingClassName = "[L" + componentClassName + ";"; + } + classArray = (Class<OUT>) Class.forName(resultingClassName); + } catch (ClassNotFoundException e) { + throw new InvalidTypesException("Could not convert GenericArrayType to Class."); + } + + return ObjectArrayTypeInfo.getInfoFor(classArray, componentInfo); + } + } + // objects with generics are treated as Class first + else if (t instanceof ParameterizedType) { + return (TypeInformation<OUT>) privateGetForClass(typeToClass(t), typeHierarchy, (ParameterizedType) t, in1Type, in2Type); + } + // no tuple, no TypeVariable, no generic type + else if (t instanceof Class) { + return privateGetForClass((Class<OUT>) t, typeHierarchy); + } + + throw new InvalidTypesException("Type Information could not be created."); + } + + private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy, + TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) { + + Type matReturnTypeVar = materializeTypeVariable(returnTypeHierarchy, returnTypeVar); + + // variable could be resolved + if (!(matReturnTypeVar instanceof TypeVariable)) { + return createTypeInfoWithTypeHierarchy(returnTypeHierarchy, matReturnTypeVar, in1TypeInfo, in2TypeInfo); + } + else { + returnTypeVar = (TypeVariable<?>) matReturnTypeVar; + } + + // no input information exists + if (in1TypeInfo == null && in2TypeInfo == null) { + return null; + } + + // create a new type hierarchy for the input + ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>(); + // copy the function part of the type hierarchy + for (Type t : returnTypeHierarchy) { + if (isClassType(t) && Function.class.isAssignableFrom(typeToClass(t)) && typeToClass(t) != Function.class) { + inputTypeHierarchy.add(t); + } + else { + break; + } + } + ParameterizedType baseClass = (ParameterizedType) inputTypeHierarchy.get(inputTypeHierarchy.size() - 1); + + TypeInformation<?> info = null; + if (in1TypeInfo != null) { + // find the deepest type variable that describes the type of input 1 + Type in1Type = baseClass.getActualTypeArguments()[0]; + + info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in1Type, in1TypeInfo); + } + + if (info == null && in2TypeInfo != null) { + // find the deepest type variable that describes the type of input 2 + Type in2Type = baseClass.getActualTypeArguments()[1]; + + info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in2Type, in2TypeInfo); + } + + if (info != null) { + return info; + } + + return null; + } + + private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) { + TypeInformation<?> info = null; + + // the input is a type variable + if (inType instanceof TypeVariable) { + inType = materializeTypeVariable(inputTypeHierarchy, (TypeVariable<?>) inType); + info = findCorrespondingInfo(returnTypeVar, inType, inTypeInfo, inputTypeHierarchy); + } + // input is an array + else if (inType instanceof GenericArrayType) { + TypeInformation<?> componentInfo = null; + if (inTypeInfo instanceof BasicArrayTypeInfo) { + componentInfo = ((BasicArrayTypeInfo<?,?>) inTypeInfo).getComponentInfo(); + } + else if (inTypeInfo instanceof PrimitiveArrayTypeInfo) { + componentInfo = BasicTypeInfo.getInfoFor(inTypeInfo.getTypeClass().getComponentType()); + } + else if (inTypeInfo instanceof ObjectArrayTypeInfo) { + componentInfo = ((ObjectArrayTypeInfo<?,?>) inTypeInfo).getComponentInfo(); + } + info = createTypeInfoFromInput(returnTypeVar, inputTypeHierarchy, ((GenericArrayType) inType).getGenericComponentType(), componentInfo); + } + // the input is a tuple + else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) + && Tuple.class.isAssignableFrom(typeToClass(inType))) { + ParameterizedType tupleBaseClass; + + // get tuple from possible tuple subclass + while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) { + inputTypeHierarchy.add(inType); + inType = typeToClass(inType).getGenericSuperclass(); + } + inputTypeHierarchy.add(inType); + + // we can assume to be parameterized since we + // already did input validation + tupleBaseClass = (ParameterizedType) inType; + + Type[] tupleElements = tupleBaseClass.getActualTypeArguments(); + // go thru all tuple elements and search for type variables + for (int i = 0; i < tupleElements.length; i++) { + info = createTypeInfoFromInput(returnTypeVar, inputTypeHierarchy, tupleElements[i], ((TupleTypeInfo<?>) inTypeInfo).getTypeAt(i)); + if(info != null) { + break; + } + } + } + // the input is a pojo + else if (inTypeInfo instanceof PojoTypeInfo) { + // build the entire type hierarchy for the pojo + getTypeHierarchy(inputTypeHierarchy, inType, Object.class); + info = findCorrespondingInfo(returnTypeVar, inType, inTypeInfo, inputTypeHierarchy); + } + return info; + } + + /** + * Creates the TypeInformation for all elements of a type that expects a certain number of + * subtypes (e.g. TupleXX or Either). + * + * @param originalType most concrete subclass + * @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes) + * @param typeHierarchy necessary for type inference + * @param in1Type necessary for type inference + * @param in2Type necessary for type inference + * @return array containing TypeInformation of sub types or null if definingType contains + * more subtypes (fields) that defined + */ + private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType, + ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + Type[] subtypes = new Type[definingType.getActualTypeArguments().length]; + + // materialize possible type variables + for (int i = 0; i < subtypes.length; i++) { + // materialize immediate TypeVariables + if (definingType.getActualTypeArguments()[i] instanceof TypeVariable<?>) { + subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) definingType.getActualTypeArguments()[i]); + } + // class or parameterized type + else { + subtypes[i] = definingType.getActualTypeArguments()[i]; + } + } + + TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length]; + for (int i = 0; i < subtypes.length; i++) { + ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy); + subTypeHierarchy.add(subtypes[i]); + // sub type could not be determined with materializing + // try to derive the type info of the TypeVariable from the immediate base child input as a last attempt + if (subtypes[i] instanceof TypeVariable<?>) { + subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type); + + // variable could not be determined + if (subTypesInfo[i] == null) { + throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '" + + ((TypeVariable<?>) subtypes[i]).getGenericDeclaration() + + "' could not be determined. This is most likely a type erasure problem. " + + "The type extraction currently supports types with generic variables only in cases where " + + "all variables in the return type can be deduced from the input type(s)."); + } + } else { + subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type); + } + } + + Class<?> originalTypeAsClass = null; + if (isClassType(originalType)) { + originalTypeAsClass = typeToClass(originalType); + } + Preconditions.checkNotNull(originalTypeAsClass, "originalType has an unexpected type"); + // check if the class we assumed to conform to the defining type so far is actually a pojo because the + // original type contains additional fields. + // check for additional fields. + int fieldCount = countFieldsInClass(originalTypeAsClass); + if(fieldCount > subTypesInfo.length) { + return null; + } + return subTypesInfo; + } + + // -------------------------------------------------------------------------------------------- + // Extract type parameters + // -------------------------------------------------------------------------------------------- + + public static Type getParameterType(Class<?> baseClass, Class<?> clazz, int pos) { + return getParameterType(baseClass, null, clazz, pos); + } + + private static Type getParameterType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Class<?> clazz, int pos) { + if (typeHierarchy != null) { + typeHierarchy.add(clazz); + } + Type[] interfaceTypes = clazz.getGenericInterfaces(); + + // search in interfaces for base class + for (Type t : interfaceTypes) { + Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos); + if (parameter != null) { + return parameter; + } + } + + // search in superclass for base class + Type t = clazz.getGenericSuperclass(); + Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos); + if (parameter != null) { + return parameter; + } + + throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " + + "Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point"); + } + + private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) { + // base class + if (t instanceof ParameterizedType && baseClass.equals(((ParameterizedType) t).getRawType())) { + if (typeHierarchy != null) { + typeHierarchy.add(t); + } + ParameterizedType baseClassChild = (ParameterizedType) t; + return baseClassChild.getActualTypeArguments()[pos]; + } + // interface that extended base class as class or parameterized type + else if (t instanceof ParameterizedType && baseClass.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType())) { + if (typeHierarchy != null) { + typeHierarchy.add(t); + } + return getParameterType(baseClass, typeHierarchy, (Class<?>) ((ParameterizedType) t).getRawType(), pos); + } + else if (t instanceof Class<?> && baseClass.isAssignableFrom((Class<?>) t)) { + if (typeHierarchy != null) { + typeHierarchy.add(t); + } + return getParameterType(baseClass, typeHierarchy, (Class<?>) t, pos); + } + return null; + } + + // -------------------------------------------------------------------------------------------- + // Validate input + // -------------------------------------------------------------------------------------------- + + private static void validateInputType(Type t, TypeInformation<?> inType) { + ArrayList<Type> typeHierarchy = new ArrayList<Type>(); + try { + validateInfo(typeHierarchy, t, inType); + } + catch(InvalidTypesException e) { + throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e); + } + } + + private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) { + ArrayList<Type> typeHierarchy = new ArrayList<Type>(); + + // try to get generic parameter + Type inType; + try { + inType = getParameterType(baseClass, typeHierarchy, clazz, inputParamPos); + } + catch (InvalidTypesException e) { + return; // skip input validation e.g. for raw types + } + + try { + validateInfo(typeHierarchy, inType, inTypeInfo); + } + catch(InvalidTypesException e) { + throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e); + } + } + + @SuppressWarnings("unchecked") + private static void validateInfo(ArrayList<Type> typeHierarchy, Type type, TypeInformation<?> typeInfo) { + if (type == null) { + throw new InvalidTypesException("Unknown Error. Type is null."); + } + + if (typeInfo == null) { + throw new InvalidTypesException("Unknown Error. TypeInformation is null."); + } + + if (!(type instanceof TypeVariable<?>)) { + // check for basic type + if (typeInfo.isBasicType()) { + + TypeInformation<?> actual; + // check if basic type at all + if (!(type instanceof Class<?>) || (actual = BasicTypeInfo.getInfoFor((Class<?>) type)) == null) { + throw new InvalidTypesException("Basic type expected."); + } + // check if correct basic type + if (!typeInfo.equals(actual)) { + throw new InvalidTypesException("Basic type '" + typeInfo + "' expected but was '" + actual + "'."); + } + + } + // check for tuple + else if (typeInfo.isTupleType()) { + // check if tuple at all + if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) { + throw new InvalidTypesException("Tuple type expected."); + } + + // do not allow usage of Tuple as type + if (isClassType(type) && typeToClass(type).equals(Tuple.class)) { + throw new InvalidTypesException("Concrete subclass of Tuple expected."); + } + + // go up the hierarchy until we reach immediate child of Tuple (with or without generics) + while (!(isClassType(type) && typeToClass(type).getSuperclass().equals(Tuple.class))) { + typeHierarchy.add(type); + type = typeToClass(type).getGenericSuperclass(); + } + + if(type == Tuple0.class) { + return; + } + + // check if immediate child of Tuple has generics + if (type instanceof Class<?>) { + throw new InvalidTypesException("Parameterized Tuple type expected."); + } + + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) typeInfo; + + Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments(); + + if (subTypes.length != tti.getArity()) { + throw new InvalidTypesException("Tuple arity '" + tti.getArity() + "' expected but was '" + + subTypes.length + "'."); + } + + for (int i = 0; i < subTypes.length; i++) { + validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i)); + } + } + // check for Either + else if (typeInfo instanceof EitherTypeInfo) { + // check if Either at all + if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) { + throw new InvalidTypesException("Either type expected."); + } + + // go up the hierarchy until we reach Either (with or without generics) + while (!(isClassType(type) && typeToClass(type).equals(Either.class))) { + typeHierarchy.add(type); + type = typeToClass(type).getGenericSuperclass(); + } + + // check if Either has generics + if (type instanceof Class<?>) { + throw new InvalidTypesException("Parameterized Either type expected."); + } + + EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo; + Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments(); + validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType()); + validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType()); + } + // check for Writable + else if (typeInfo instanceof WritableTypeInfo<?>) { + // check if writable at all + if (!(type instanceof Class<?> && Writable.class.isAssignableFrom((Class<?>) type))) { + throw new InvalidTypesException("Writable type expected."); + } + + // check writable type contents + Class<?> clazz; + if (((WritableTypeInfo<?>) typeInfo).getTypeClass() != (clazz = (Class<?>) type)) { + throw new InvalidTypesException("Writable type '" + + ((WritableTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '" + + clazz.getCanonicalName() + "'."); + } + } + // check for primitive array + else if (typeInfo instanceof PrimitiveArrayTypeInfo) { + Type component; + // check if array at all + if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null) + && !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) { + throw new InvalidTypesException("Array type expected."); + } + if (component instanceof TypeVariable<?>) { + component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component); + if (component instanceof TypeVariable) { + return; + } + } + if (!(component instanceof Class<?> && ((Class<?>)component).isPrimitive())) { + throw new InvalidTypesException("Primitive component expected."); + } + } + // check for basic array + else if (typeInfo instanceof BasicArrayTypeInfo<?, ?>) { + Type component; + // check if array at all + if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null) + && !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) { + throw new InvalidTypesException("Array type expected."); + } + + if (component instanceof TypeVariable<?>) { + component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component); + if (component instanceof TypeVariable) { + return; + } + } + + validateInfo(typeHierarchy, component, ((BasicArrayTypeInfo<?, ?>) typeInfo).getComponentInfo()); + + } + // check for object array + else if (typeInfo instanceof ObjectArrayTypeInfo<?, ?>) { + // check if array at all + if (!(type instanceof Class<?> && ((Class<?>) type).isArray()) && !(type instanceof GenericArrayType)) { + throw new InvalidTypesException("Object array type expected."); + } + + // check component + Type component; + if (type instanceof Class<?>) { + component = ((Class<?>) type).getComponentType(); + } else { + component = ((GenericArrayType) type).getGenericComponentType(); + } + + if (component instanceof TypeVariable<?>) { + component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component); + if (component instanceof TypeVariable) { + return; + } + } + + validateInfo(typeHierarchy, component, ((ObjectArrayTypeInfo<?, ?>) typeInfo).getComponentInfo()); + } + // check for value + else if (typeInfo instanceof ValueTypeInfo<?>) { + // check if value at all + if (!(type instanceof Class<?> && Value.class.isAssignableFrom((Class<?>) type))) { + throw new InvalidTypesException("Value type expected."); + } + + TypeInformation<?> actual; + // check value type contents + if (!((ValueTypeInfo<?>) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) { + throw new InvalidTypesException("Value type '" + typeInfo + "' expected but was '" + actual + "'."); + } + } + // check for POJO + else if (typeInfo instanceof PojoTypeInfo) { + Class<?> clazz = null; + if (!(isClassType(type) && ((PojoTypeInfo<?>) typeInfo).getTypeClass() == (clazz = typeToClass(type)))) { + throw new InvalidTypesException("POJO type '" + + ((PojoTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '" + + clazz.getCanonicalName() + "'."); + } + } + // check for Enum + else if (typeInfo instanceof EnumTypeInfo) { + if (!(type instanceof Class<?> && Enum.class.isAssignableFrom((Class<?>) type))) { + throw new InvalidTypesException("Enum type expected."); + } + // check enum type contents + if (!(typeInfo.getTypeClass() == type)) { + throw new InvalidTypesException("Enum type '" + typeInfo.getTypeClass().getCanonicalName() + "' expected but was '" + + typeToClass(type).getCanonicalName() + "'."); + } + } + // check for generic object + else if (typeInfo instanceof GenericTypeInfo<?>) { + Class<?> clazz = null; + if (!(isClassType(type) && ((GenericTypeInfo<?>) typeInfo).getTypeClass() == (clazz = typeToClass(type)))) { + throw new InvalidTypesException("Generic object type '" + + ((GenericTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '" + + clazz.getCanonicalName() + "'."); + } + } + } else { + type = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) type); + if (!(type instanceof TypeVariable)) { + validateInfo(typeHierarchy, type, typeInfo); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Utility methods + // -------------------------------------------------------------------------------------------- + + /** + * @return number of items with equal type or same raw type + */ + private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) { + int count = 0; + for (Type t : typeHierarchy) { + if (t == type || (isClassType(type) && t == typeToClass(type))) { + count++; + } + } + return count; + } + + /** + * @param curT : start type + * @return Type The immediate child of the top class + */ + private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) { + // skip first one + if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) { + curT = typeToClass(curT).getGenericSuperclass(); + } + while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) { + typeHierarchy.add(curT); + curT = typeToClass(curT).getGenericSuperclass(); + + if (curT == null) { + break; + } + } + return curT; + } + + private int countFieldsInClass(Class<?> clazz) { + int fieldCount = 0; + for(Field field : clazz.getFields()) { // get all fields + if( !Modifier.isStatic(field.getModifiers()) && + !Modifier.isTransient(field.getModifiers()) + ) { + fieldCount++; + } + } + return fieldCount; + } + + private static Type removeGenericWrapper(Type t) { + if(t instanceof ParameterizedType && + (Collector.class.isAssignableFrom(typeToClass(t)) + || Iterable.class.isAssignableFrom(typeToClass(t)))) { + return ((ParameterizedType) t).getActualTypeArguments()[0]; + } + return t; + } + + private static void validateLambdaGenericParameters(Method m) { + // check the arguments + for (Type t : m.getGenericParameterTypes()) { + validateLambdaGenericParameter(t); + } + + // check the return type + validateLambdaGenericParameter(m.getGenericReturnType()); + } + + private static void validateLambdaGenericParameter(Type t) { + if(!(t instanceof Class)) { + return; + } + final Class<?> clazz = (Class<?>) t; + + if(clazz.getTypeParameters().length > 0) { + throw new InvalidTypesException("The generic type parameters of '" + clazz.getSimpleName() + "' are missing. \n" + + "It seems that your compiler has not stored them into the .class file. \n" + + "Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. \n" + + "See the documentation for more information about how to compile jobs containing lambda expressions."); + } + } + + private static String encodePrimitiveClass(Class<?> primitiveClass) { + if (primitiveClass == boolean.class) { + return "Z"; + } + else if (primitiveClass == byte.class) { + return "B"; + } + else if (primitiveClass == char.class) { + return "C"; + } + else if (primitiveClass == double.class) { + return "D"; + } + else if (primitiveClass == float.class) { + return "F"; + } + else if (primitiveClass == int.class) { + return "I"; + } + else if (primitiveClass == long.class) { + return "J"; + } + else if (primitiveClass == short.class) { + return "S"; + } + throw new InvalidTypesException(); + } + + private static TypeInformation<?> findCorrespondingInfo(TypeVariable<?> typeVar, Type type, TypeInformation<?> corrInfo, ArrayList<Type> typeHierarchy) { + if (sameTypeVars(type, typeVar)) { + return corrInfo; + } + else if (type instanceof TypeVariable && sameTypeVars(materializeTypeVariable(typeHierarchy, (TypeVariable<?>) type), typeVar)) { + return corrInfo; + } + else if (type instanceof GenericArrayType) { + TypeInformation<?> componentInfo = null; + if (corrInfo instanceof BasicArrayTypeInfo) { + componentInfo = ((BasicArrayTypeInfo<?,?>) corrInfo).getComponentInfo(); + } + else if (corrInfo instanceof PrimitiveArrayTypeInfo) { + componentInfo = BasicTypeInfo.getInfoFor(corrInfo.getTypeClass().getComponentType()); + } + else if (corrInfo instanceof ObjectArrayTypeInfo) { + componentInfo = ((ObjectArrayTypeInfo<?,?>) corrInfo).getComponentInfo(); + } + TypeInformation<?> info = findCorrespondingInfo(typeVar, ((GenericArrayType) type).getGenericComponentType(), componentInfo, typeHierarchy); + if (info != null) { + return info; + } + } + else if (corrInfo instanceof TupleTypeInfo + && type instanceof ParameterizedType + && Tuple.class.isAssignableFrom((Class<?>) ((ParameterizedType) type).getRawType())) { + ParameterizedType tuple = (ParameterizedType) type; + Type[] args = tuple.getActualTypeArguments(); + + for (int i = 0; i < args.length; i++) { + TypeInformation<?> info = findCorrespondingInfo(typeVar, args[i], ((TupleTypeInfo<?>) corrInfo).getTypeAt(i), typeHierarchy); + if (info != null) { + return info; + } + } + } + else if (corrInfo instanceof PojoTypeInfo && isClassType(type)) { + // determine a field containing the type variable + List<Field> fields = getAllDeclaredFields(typeToClass(type)); + for (Field field : fields) { + Type fieldType = field.getGenericType(); + if (fieldType instanceof TypeVariable + && sameTypeVars(typeVar, materializeTypeVariable(typeHierarchy, (TypeVariable<?>) fieldType))) { + return getTypeOfPojoField(corrInfo, field); + } + else if (fieldType instanceof ParameterizedType + || fieldType instanceof GenericArrayType) { + ArrayList<Type> typeHierarchyWithFieldType = new ArrayList<Type>(typeHierarchy); + typeHierarchyWithFieldType.add(fieldType); + TypeInformation<?> info = findCorrespondingInfo(typeVar, fieldType, getTypeOfPojoField(corrInfo, field), typeHierarchyWithFieldType); + if (info != null) { + return info; + } + } + } + } + return null; + } + + /** + * Tries to find a concrete value (Class, ParameterizedType etc. ) for a TypeVariable by traversing the type hierarchy downwards. + * If a value could not be found it will return the most bottom type variable in the hierarchy. + */ + private static Type materializeTypeVariable(ArrayList<Type> typeHierarchy, TypeVariable<?> typeVar) { + TypeVariable<?> inTypeTypeVar = typeVar; + // iterate thru hierarchy from top to bottom until type variable gets a class assigned + for (int i = typeHierarchy.size() - 1; i >= 0; i--) { + Type curT = typeHierarchy.get(i); + + // parameterized type + if (curT instanceof ParameterizedType) { + Class<?> rawType = ((Class<?>) ((ParameterizedType) curT).getRawType()); + + for (int paramIndex = 0; paramIndex < rawType.getTypeParameters().length; paramIndex++) { + + TypeVariable<?> curVarOfCurT = rawType.getTypeParameters()[paramIndex]; + + // check if variable names match + if (sameTypeVars(curVarOfCurT, inTypeTypeVar)) { + Type curVarType = ((ParameterizedType) curT).getActualTypeArguments()[paramIndex]; + + // another type variable level + if (curVarType instanceof TypeVariable<?>) { + inTypeTypeVar = (TypeVariable<?>) curVarType; + } + // class + else { + return curVarType; + } + } + } + } + } + // can not be materialized, most likely due to type erasure + // return the type variable of the deepest level + return inTypeTypeVar; + } + + /** + * Creates type information from a given Class such as Integer, String[] or POJOs. + * + * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies. + * In most cases {@link TypeExtractor#createTypeInfo(Type)} is the recommended method for type extraction + * (a Class is a child of Type). + * + * @param clazz a Class to create TypeInformation for + * @return TypeInformation that describes the passed Class + */ + public static <X> TypeInformation<X> getForClass(Class<X> clazz) { + return new TypeExtractor().privateGetForClass(clazz, new ArrayList<Type>()); + } + + private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) { + return privateGetForClass(clazz, typeHierarchy, null, null, null); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy, + ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + Preconditions.checkNotNull(clazz); + + if (clazz.equals(Object.class)) { + return new GenericTypeInfo<OUT>(clazz); + } + + // check for arrays + if (clazz.isArray()) { + + // primitive arrays: int[], byte[], ... + PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo = PrimitiveArrayTypeInfo.getInfoFor(clazz); + if (primitiveArrayInfo != null) { + return primitiveArrayInfo; + } + + // basic type arrays: String[], Integer[], Double[] + BasicArrayTypeInfo<OUT, ?> basicArrayInfo = BasicArrayTypeInfo.getInfoFor(clazz); + if (basicArrayInfo != null) { + return basicArrayInfo; + } + + // object arrays + else { + TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy( + typeHierarchy, + clazz.getComponentType(), + in1Type, + in2Type); + + return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo); + } + } + + // check for writable types + if(Writable.class.isAssignableFrom(clazz) && !Writable.class.equals(clazz)) { + return (TypeInformation<OUT>) WritableTypeInfo.getWritableTypeInfo((Class<? extends Writable>) clazz); + } + + // check for basic types + TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz); + if (basicTypeInfo != null) { + return basicTypeInfo; + } + + // check for subclasses of Value + if (Value.class.isAssignableFrom(clazz)) { + Class<? extends Value> valueClass = clazz.asSubclass(Value.class); + return (TypeInformation<OUT>) ValueTypeInfo.getValueTypeInfo(valueClass); + } + + // check for subclasses of Tuple + if (Tuple.class.isAssignableFrom(clazz)) { + if(clazz == Tuple0.class) { + return new TupleTypeInfo(Tuple0.class); + } + throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class."); + } + + // check for subclasses of Either + if (Either.class.isAssignableFrom(clazz)) { + throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class."); + } + + // check for Enums + if(Enum.class.isAssignableFrom(clazz)) { + return new EnumTypeInfo(clazz); + } + + // special case for POJOs generated by Avro. + if(SpecificRecordBase.class.isAssignableFrom(clazz)) { + return new AvroTypeInfo(clazz); + } + + if (countTypeInHierarchy(typeHierarchy, clazz) > 1) { + return new GenericTypeInfo<OUT>(clazz); + } + + if (Modifier.isInterface(clazz.getModifiers())) { + // Interface has no members and is therefore not handled as POJO + return new GenericTypeInfo<OUT>(clazz); + } + + if (clazz.equals(Class.class)) { + // special case handling for Class, this should not be handled by the POJO logic + return new GenericTypeInfo<OUT>(clazz); + } + + try { + TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), parameterizedType, in1Type, in2Type); + if (pojoType != null) { + return pojoType; + } + } catch (InvalidTypesException e) { + if(LOG.isDebugEnabled()) { + LOG.debug("Unable to handle type "+clazz+" as POJO. Message: "+e.getMessage(), e); + } + // ignore and create generic type info + } + + // return a generic type + return new GenericTypeInfo<OUT>(clazz); + } + + /** + * Checks if the given field is a valid pojo field: + * - it is public + * OR + * - there are getter and setter methods for the field. + * + * @param f field to check + * @param clazz class of field + * @param typeHierarchy type hierarchy for materializing generic types + */ + private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHierarchy) { + if(Modifier.isPublic(f.getModifiers())) { + return true; + } else { + boolean hasGetter = false, hasSetter = false; + final String fieldNameLow = f.getName().toLowerCase().replaceAll("_", ""); + + Type fieldType = f.getGenericType(); + Class<?> fieldTypeWrapper = ClassUtils.primitiveToWrapper(f.getType()); + + TypeVariable<?> fieldTypeGeneric = null; + if(fieldType instanceof TypeVariable) { + fieldTypeGeneric = (TypeVariable<?>) fieldType; + fieldType = materializeTypeVariable(typeHierarchy, (TypeVariable<?>)fieldType); + } + for(Method m : clazz.getMethods()) { + final String methodNameLow = m.getName().endsWith("_$eq") ? + m.getName().toLowerCase().replaceAll("_", "").replaceFirst("\\$eq$", "_\\$eq") : + m.getName().toLowerCase().replaceAll("_", ""); + + // check for getter + if( // The name should be "get<FieldName>" or "<fieldName>" (for scala) or "is<fieldName>" for boolean fields. + (methodNameLow.equals("get"+fieldNameLow) || methodNameLow.equals("is"+fieldNameLow) || methodNameLow.equals(fieldNameLow)) && + // no arguments for the getter + m.getParameterTypes().length == 0 && + // return type is same as field type (or the generic variant of it) + (m.getGenericReturnType().equals( fieldType ) || (fieldTypeWrapper != null && m.getReturnType().equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) ) + ) { + if(hasGetter) { + throw new IllegalStateException("Detected more than one getter"); + } + hasGetter = true; + } + // check for setters (<FieldName>_$eq for scala) + if((methodNameLow.equals("set"+fieldNameLow) || methodNameLow.equals(fieldNameLow+"_$eq")) && + m.getParameterTypes().length == 1 && // one parameter of the field's type + (m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeWrapper != null && m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&& + // return type is void. + m.getReturnType().equals(Void.TYPE) + ) { + if(hasSetter) { + throw new IllegalStateException("Detected more than one setter"); + } + hasSetter = true; + } + } + if(hasGetter && hasSetter) { + return true; + } else { + if(!hasGetter) { + LOG.debug(clazz+" does not contain a getter for field "+f.getName() ); + } + if(!hasSetter) { + LOG.debug(clazz+" does not contain a setter for field "+f.getName() ); + } + return false; + } + } + } + + @SuppressWarnings("unchecked") + protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, + ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + + if (!Modifier.isPublic(clazz.getModifiers())) { + LOG.info("Class " + clazz.getName() + " is not public, cannot treat it as a POJO type. Will be handled as GenericType"); + return new GenericTypeInfo<OUT>(clazz); + } + + // add the hierarchy of the POJO itself if it is generic + if (parameterizedType != null) { + getTypeHierarchy(typeHierarchy, parameterizedType, Object.class); + } + // create a type hierarchy, if the incoming only contains the most bottom one or none. + else if (typeHierarchy.size() <= 1) { + getTypeHierarchy(typeHierarchy, clazz, Object.class); + } + + List<Field> fields = getAllDeclaredFields(clazz); + if (fields.size() == 0) { + LOG.info("No fields detected for " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType"); + return new GenericTypeInfo<OUT>(clazz); + } + + List<PojoField> pojoFields = new ArrayList<PojoField>(); + for (Field field : fields) { + Type fieldType = field.getGenericType(); + if(!isValidPojoField(field, clazz, typeHierarchy)) { + LOG.info(clazz + " is not a valid POJO type"); + return null; + } + try { + ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy); + fieldTypeHierarchy.add(fieldType); + TypeInformation<?> ti = createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, in1Type, in2Type); + pojoFields.add(new PojoField(field, ti)); + } catch (InvalidTypesException e) { + Class<?> genericClass = Object.class; + if(isClassType(fieldType)) { + genericClass = typeToClass(fieldType); + } + pojoFields.add(new PojoField(field, new GenericTypeInfo<OUT>((Class<OUT>) genericClass))); + } + } + + CompositeType<OUT> pojoType = new PojoTypeInfo<OUT>(clazz, pojoFields); + + // + // Validate the correctness of the pojo. + // returning "null" will result create a generic type information. + // + List<Method> methods = getAllDeclaredMethods(clazz); + for (Method method : methods) { + if (method.getName().equals("readObject") || method.getName().equals("writeObject")) { + LOG.info(clazz+" contains custom serialization methods we do not call."); + return null; + } + } + + // Try retrieving the default constructor, if it does not have one + // we cannot use this because the serializer uses it. + Constructor defaultConstructor = null; + try { + defaultConstructor = clazz.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) { + LOG.info(clazz + " is abstract or an interface, having a concrete " + + "type can increase performance."); + } else { + LOG.info(clazz + " must have a default constructor to be used as a POJO."); + return null; + } + } + if(defaultConstructor != null && !Modifier.isPublic(defaultConstructor.getModifiers())) { + LOG.info("The default constructor of " + clazz + " should be Public to be used as a POJO."); + return null; + } + + // everything is checked, we return the pojo + return pojoType; + } + + /** + * recursively determine all declared fields + * This is required because class.getFields() is not returning fields defined + * in parent classes. + */ + public static List<Field> getAllDeclaredFields(Class<?> clazz) { + List<Field> result = new ArrayList<Field>(); + while (clazz != null) { + Field[] fields = clazz.getDeclaredFields(); + for (Field field : fields) { + if(Modifier.isTransient(field.getModifiers()) || Modifier.isStatic(field.getModifiers())) { + continue; // we have no use for transient or static fields + } + if(hasFieldWithSameName(field.getName(), result)) { + throw new RuntimeException("The field "+field+" is already contained in the hierarchy of the "+clazz+"." + + "Please use unique field names through your classes hierarchy"); + } + result.add(field); + } + clazz = clazz.getSuperclass(); + } + return result; + } + + public static Field getDeclaredField(Class<?> clazz, String name) { + for (Field field : getAllDeclaredFields(clazz)) { + if (field.getName().equals(name)) { + return field; + } + } + return null; + } + + private static boolean hasFieldWithSameName(String name, List<Field> fields) { + for(Field field : fields) { + if(name.equals(field.getName())) { + return true; + } + } + return false; + } + + + // recursively determine all declared methods + private static List<Method> getAllDeclaredMethods(Class<?> clazz) { + List<Method> result = new ArrayList<Method>(); + while (clazz != null) { + Method[] methods = clazz.getDeclaredMethods(); + for (Method method : methods) { + result.add(method); + } + clazz = clazz.getSuperclass(); + } + return result; + } + + // not public to users + public static Class<?> typeToClass(Type t) { + if (t instanceof Class) { + return (Class<?>)t; + } + else if (t instanceof ParameterizedType) { + return ((Class<?>)((ParameterizedType) t).getRawType()); + } + throw new IllegalArgumentException("Cannot convert type to class"); + } + + // not public to users + public static boolean isClassType(Type t) { + return t instanceof Class<?> || t instanceof ParameterizedType; + } + + private static boolean sameTypeVars(Type t1, Type t2) { + if (!(t1 instanceof TypeVariable) || !(t2 instanceof TypeVariable)) { + return false; + } + return ((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>) t2).getName()) + && ((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>) t2).getGenericDeclaration()); + } + + private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) { + for (int j = 0; j < pojoInfo.getArity(); j++) { + PojoField pf = ((PojoTypeInfo<?>) pojoInfo).getPojoFieldAt(j); + if (pf.getField().getName().equals(field.getName())) { + return pf.getTypeInformation(); + } + } + return null; + } + + + public static <X> TypeInformation<X> getForObject(X value) { + return new TypeExtractor().privateGetForObject(value); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private <X> TypeInformation<X> privateGetForObject(X value) { + Preconditions.checkNotNull(value); + + // check if we can extract the types from tuples, otherwise work with the class + if (value instanceof Tuple) { + Tuple t = (Tuple) value; + int numFields = t.getArity(); + if(numFields != countFieldsInClass(value.getClass())) { + // not a tuple since it has more fields. + return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo here, because + // there is currently no other type that can handle such a class. + } + + TypeInformation<?>[] infos = new TypeInformation[numFields]; + for (int i = 0; i < numFields; i++) { + Object field = t.getField(i); + + if (field == null) { + throw new InvalidTypesException("Automatic type extraction is not possible on candidates with null values. " + + "Please specify the types directly."); + } + + infos[i] = privateGetForObject(field); + } + return new TupleTypeInfo(value.getClass(), infos); + } + // we can not extract the types from an Either object since it only contains type information + // of one type, but from Either classes + else if (value instanceof Either) { + try { + return (TypeInformation<X>) privateCreateTypeInfo(value.getClass()); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Automatic type extraction is not possible on an Either type " + + "as it does not contain information about both possible types. " + + "Please specify the types directly."); + } + } + else { + return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>()); + } + } +}