http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java index 024eb71..de59c36 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -38,12 +39,12 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen private final Class<T> typeClass; public EnumTypeInfo(Class<T> typeClass) { - if (typeClass == null) { - throw new NullPointerException(); - } + Preconditions.checkNotNull(typeClass, "Enum type class must not be null."); + if (!Enum.class.isAssignableFrom(typeClass) ) { throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName()); } + this.typeClass = typeClass; } @@ -98,13 +99,22 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen @Override public int hashCode() { - return typeClass.hashCode() ^ 0xd3a2646c; + return typeClass.hashCode(); } - + + @Override + public boolean canEqual(Object obj) { + return obj instanceof EnumTypeInfo; + } + @Override public boolean equals(Object obj) { if (obj instanceof EnumTypeInfo) { - return typeClass == ((EnumTypeInfo<?>) obj).typeClass; + @SuppressWarnings("unchecked") + EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj; + + return enumTypeInfo.canEqual(this) && + typeClass == enumTypeInfo.typeClass; } else { return false; }
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java index 5caf8f2..7e7aa68 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -34,7 +35,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType private final Class<T> typeClass; public GenericTypeInfo(Class<T> typeClass) { - this.typeClass = typeClass; + this.typeClass = Preconditions.checkNotNull(typeClass); } @Override @@ -88,13 +89,21 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType @Override public int hashCode() { - return typeClass.hashCode() ^ 0x165667b1; + return typeClass.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof GenericTypeInfo; } @Override public boolean equals(Object obj) { - if (obj.getClass() == GenericTypeInfo.class) { - return typeClass == ((GenericTypeInfo<?>) obj).typeClass; + if (obj instanceof GenericTypeInfo) { + @SuppressWarnings("unchecked") + GenericTypeInfo<T> genericTypeInfo = (GenericTypeInfo<T>) obj; + + return typeClass == genericTypeInfo.typeClass; } else { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java index 227c68c..1dd7f01 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java @@ -31,8 +31,8 @@ public class MissingTypeInfo extends TypeInformation<InvalidTypesException> { private static final long serialVersionUID = -4212082837126702723L; - private String functionName; - private InvalidTypesException typeException; + private final String functionName; + private final InvalidTypesException typeException; public MissingTypeInfo(String functionName) { @@ -87,6 +87,34 @@ public class MissingTypeInfo extends TypeInformation<InvalidTypesException> { } @Override + public String toString() { + return getClass().getSimpleName() + "<" + functionName + ", " + typeException.getMessage() + ">"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof MissingTypeInfo) { + MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj; + + return missingTypeInfo.canEqual(this) && + functionName.equals(missingTypeInfo.functionName) && + typeException.equals(missingTypeInfo.typeException); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * functionName.hashCode() + typeException.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof MissingTypeInfo; + } + + @Override public int getTotalFields() { throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java index 6806122..150c976 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java @@ -19,13 +19,9 @@ package org.apache.flink.api.java.typeutils; import java.lang.reflect.Array; -import java.lang.reflect.GenericArrayType; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; @@ -34,21 +30,12 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { private static final long serialVersionUID = 1L; - private final Type arrayType; - private final Type componentType; + private final Class<T> arrayType; private final TypeInformation<C> componentInfo; - @SuppressWarnings("unchecked") - private ObjectArrayTypeInfo(Type arrayType, Type componentType) { - this.arrayType = arrayType; - this.componentType = componentType; - this.componentInfo = (TypeInformation<C>) TypeExtractor.createTypeInfo(componentType); - } - - private ObjectArrayTypeInfo(Type arrayType, Type componentType, TypeInformation<C> componentInfo) { - this.arrayType = arrayType; - this.componentType = componentType; - this.componentInfo = componentInfo; + private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) { + this.arrayType = Preconditions.checkNotNull(arrayType); + this.componentInfo = Preconditions.checkNotNull(componentInfo); } // -------------------------------------------------------------------------------------------- @@ -76,29 +63,9 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { @SuppressWarnings("unchecked") @Override public Class<T> getTypeClass() { - // if arrayType is a Class - if (arrayType instanceof Class) { - return (Class<T>) arrayType; - } - - // if arrayType is a GenericArrayType - Class<?> componentClass = (Class<?>) ((ParameterizedType) componentType).getRawType(); - - try { - return (Class<T>) Class.forName("[L" + componentClass.getName() + ";"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot create non-generic type class.", e); - } - } - - public Type getType() { return arrayType; } - public Type getComponentType() { - return this.componentType; - } - public TypeInformation<C> getComponentInfo() { return componentInfo; } @@ -111,15 +78,9 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { @SuppressWarnings("unchecked") @Override public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { - // use raw type for serializer if generic array type - if (this.componentType instanceof GenericArrayType) { - ParameterizedType paramType = (ParameterizedType) ((GenericArrayType) this.componentType).getGenericComponentType(); - - return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) paramType.getRawType(), - this.componentInfo.createSerializer(executionConfig)); - } else { - return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) this.componentType, this.componentInfo.createSerializer(executionConfig)); - } + return (TypeSerializer<T>) new GenericArraySerializer<C>( + componentInfo.getTypeClass(), + componentInfo.createSerializer(executionConfig)); } @Override @@ -128,38 +89,37 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { + public boolean equals(Object obj) { + if (obj instanceof ObjectArrayTypeInfo) { + @SuppressWarnings("unchecked") + ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = (ObjectArrayTypeInfo<T, C>)obj; + + return objectArrayTypeInfo.canEqual(this) && + arrayType == objectArrayTypeInfo.arrayType && + componentInfo.equals(objectArrayTypeInfo.componentInfo); + } else { return false; } + } - ObjectArrayTypeInfo<?, ?> that = (ObjectArrayTypeInfo<?, ?>) o; - return this.arrayType.equals(that.arrayType) && this.componentType.equals(that.componentType); + @Override + public boolean canEqual(Object obj) { + return obj instanceof ObjectArrayTypeInfo; } @Override public int hashCode() { - return 31 * this.arrayType.hashCode() + this.componentType.hashCode(); + return 31 * this.arrayType.hashCode() + this.componentInfo.hashCode(); } // -------------------------------------------------------------------------------------------- - public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Type type, TypeInformation<C> componentInfo) { - // generic array type e.g. for Tuples - if (type instanceof GenericArrayType) { - GenericArrayType genericArray = (GenericArrayType) type; - return new ObjectArrayTypeInfo<T, C>(type, genericArray.getGenericComponentType(), componentInfo); - } - // for tuples without generics (e.g. generated by the TypeInformation parser) - // and multidimensional arrays (e.g. in scala) - else if (type instanceof Class<?> && ((Class<?>) type).isArray() - && BasicTypeInfo.getInfoFor((Class<?>) type) == null) { - return new ObjectArrayTypeInfo<T, C>(type, ((Class<?>) type).getComponentType(), componentInfo); - } - throw new InvalidTypesException("The given type is not a valid object array."); + public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) { + Preconditions.checkNotNull(arrayClass); + Preconditions.checkNotNull(componentInfo); + Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array."); + + return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo); } /** @@ -170,20 +130,12 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { * This must be used in cases where the complete type of the array is not available as a * {@link java.lang.reflect.Type} or {@link java.lang.Class}. */ + @SuppressWarnings("unchecked") public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) { - return new ObjectArrayTypeInfo<T, C>( - Array.newInstance(componentInfo.getTypeClass(), 0).getClass(), - componentInfo.getTypeClass(), - componentInfo); - } + Preconditions.checkNotNull(componentInfo); - @SuppressWarnings("unchecked") - public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Type type) { - // class type e.g. for POJOs - if (type instanceof Class<?> && ((Class<?>) type).isArray() && BasicTypeInfo.getInfoFor((Class<C>) type) == null) { - Class<C> array = (Class<C>) type; - return new ObjectArrayTypeInfo<T, C>(type, array.getComponentType()); - } - throw new InvalidTypesException("The given type is not a valid object array."); + return new ObjectArrayTypeInfo<T, C>( + (Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(), + componentInfo); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java index 91a7a5e..4ad0ac2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java @@ -23,16 +23,29 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.lang.reflect.Field; +import java.util.Objects; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.typeinfo.TypeInformation; public class PojoField implements Serializable { - public transient Field field; - public TypeInformation<?> type; + + private static final long serialVersionUID = 1975295846436559363L; + + private transient Field field; + private final TypeInformation<?> type; public PojoField(Field field, TypeInformation<?> type) { - this.field = field; - this.type = type; + this.field = Preconditions.checkNotNull(field); + this.type = Preconditions.checkNotNull(type); + } + + public Field getField() { + return field; + } + + public TypeInformation<?> getTypeInformation() { + return type; } private void writeObject(ObjectOutputStream out) @@ -68,4 +81,25 @@ public class PojoField implements Serializable { public String toString() { return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")"; } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PojoField) { + PojoField other = (PojoField) obj; + + return other.canEqual(this) && type.equals(other.type) && + Objects.equals(field, other.field); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(field, type); + } + + public boolean canEqual(Object obj) { + return obj instanceof PojoField; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 273a907..f7e4e42 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -22,12 +22,12 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; @@ -40,8 +40,6 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import com.google.common.base.Joiner; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs, @@ -59,8 +57,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(PojoTypeInfo.class); - private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS @@ -70,31 +66,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> { private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); - private final Class<T> typeClass; - - private PojoField[] fields; + private final PojoField[] fields; - private int totalFields; + private final int totalFields; public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) { super(typeClass); - this.typeClass = typeClass; - List<PojoField> tempFields = new ArrayList<PojoField>(fields); - Collections.sort(tempFields, new Comparator<PojoField>() { + + Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()), + "POJO " + typeClass + " is not public"); + + this.fields = fields.toArray(new PojoField[fields.size()]); + + Arrays.sort(this.fields, new Comparator<PojoField>() { @Override public int compare(PojoField o1, PojoField o2) { - return o1.field.getName().compareTo(o2.field.getName()); + return o1.getField().getName().compareTo(o2.getField().getName()); } }); - this.fields = tempFields.toArray(new PojoField[tempFields.size()]); - - // check if POJO is public - if(!Modifier.isPublic(typeClass.getModifiers())) { - throw new RuntimeException("POJO "+typeClass+" is not public"); - } + + int counterFields = 0; + for(PojoField field : fields) { - totalFields += field.type.getTotalFields(); + counterFields += field.getTypeInformation().getTotalFields(); } + + totalFields = counterFields; } @Override @@ -119,11 +116,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> { } @Override - public Class<T> getTypeClass() { - return typeClass; - } - - @Override public boolean isSortKeyType() { // Support for sorting POJOs that implement Comparable is not implemented yet. // Since the order of fields in a POJO type is not well defined, sorting on fields @@ -145,12 +137,16 @@ public class PojoTypeInfo<T> extends CompositeType<T> { // handle select all int keyPosition = 0; for(PojoField pField : fields) { - if(pField.type instanceof CompositeType) { - CompositeType<?> cType = (CompositeType<?>)pField.type; + if(pField.getTypeInformation() instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>)pField.getTypeInformation(); cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); keyPosition += cType.getTotalFields()-1; } else { - result.add(new NamedFlatFieldDescriptor(pField.field.getName(), offset + keyPosition, pField.type)); + result.add( + new NamedFlatFieldDescriptor( + pField.getField().getName(), + offset + keyPosition, + pField.getTypeInformation())); } keyPosition++; } @@ -163,9 +159,9 @@ public class PojoTypeInfo<T> extends CompositeType<T> { int fieldPos = -1; TypeInformation<?> fieldType = null; for (int i = 0; i < fields.length; i++) { - if (fields[i].field.getName().equals(field)) { + if (fields[i].getField().getName().equals(field)) { fieldPos = i; - fieldType = fields[i].type; + fieldType = fields[i].getTypeInformation(); break; } } @@ -181,7 +177,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> { } // add all fields of composite type ((CompositeType<?>) fieldType).getFlatFields("*", offset, result); - return; } else { // we found the field to add // compute flat field position by adding skipped fields @@ -190,8 +185,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> { flatFieldPos += this.getTypeAt(i).getTotalFields(); } result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); - // nothing left to do - return; } } else { if(fieldType instanceof CompositeType<?>) { @@ -200,8 +193,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> { offset += this.getTypeAt(i).getTotalFields(); } ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result); - // nothing left to do - return; } else { throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); } @@ -226,9 +217,9 @@ public class PojoTypeInfo<T> extends CompositeType<T> { int fieldPos = -1; TypeInformation<?> fieldType = null; for (int i = 0; i < fields.length; i++) { - if (fields[i].field.getName().equals(field)) { + if (fields[i].getField().getName().equals(field)) { fieldPos = i; - fieldType = fields[i].type; + fieldType = fields[i].getTypeInformation(); break; } } @@ -255,10 +246,15 @@ public class PojoTypeInfo<T> extends CompositeType<T> { throw new IndexOutOfBoundsException(); } @SuppressWarnings("unchecked") - TypeInformation<X> typed = (TypeInformation<X>) fields[pos].type; + TypeInformation<X> typed = (TypeInformation<X>) fields[pos].getTypeInformation(); return typed; } + @Override + protected TypeComparatorBuilder<T> createTypeComparatorBuilder() { + return new PojoTypeComparatorBuilder(); + } + // used for testing. Maybe use mockito here public PojoField getPojoFieldAt(int pos) { if (pos < 0 || pos >= this.fields.length) { @@ -267,42 +263,10 @@ public class PojoTypeInfo<T> extends CompositeType<T> { return this.fields[pos]; } - /** - * Comparator creation - */ - private TypeComparator<?>[] fieldComparators; - private Field[] keyFields; - private int comparatorHelperIndex = 0; - @Override - protected void initializeNewComparator(int keyCount) { - fieldComparators = new TypeComparator<?>[keyCount]; - keyFields = new Field[keyCount]; - comparatorHelperIndex = 0; - } - - @Override - protected void addCompareField(int fieldId, TypeComparator<?> comparator) { - fieldComparators[comparatorHelperIndex] = comparator; - keyFields[comparatorHelperIndex] = fields[fieldId].field; - comparatorHelperIndex++; - } - - @Override - protected TypeComparator<T> getNewComparator(ExecutionConfig config) { - // first remove the null array fields - final Field[] finalKeyFields = Arrays.copyOf(keyFields, comparatorHelperIndex); - @SuppressWarnings("rawtypes") - final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex); - if(finalFieldComparators.length == 0 || finalKeyFields.length == 0 || finalFieldComparators.length != finalKeyFields.length) { - throw new IllegalArgumentException("Pojo comparator creation has a bug"); - } - return new PojoComparator<T>(finalKeyFields, finalFieldComparators, createSerializer(config), typeClass); - } - public String[] getFieldNames() { String[] result = new String[fields.length]; for (int i = 0; i < fields.length; i++) { - result[i] = fields[i].field.getName(); + result[i] = fields[i].getField().getName(); } return result; } @@ -310,7 +274,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> { @Override public int getFieldIndex(String fieldName) { for (int i = 0; i < fields.length; i++) { - if (fields[i].field.getName().equals(fieldName)) { + if (fields[i].getField().getName().equals(fieldName)) { return i; } } @@ -320,46 +284,106 @@ public class PojoTypeInfo<T> extends CompositeType<T> { @Override public TypeSerializer<T> createSerializer(ExecutionConfig config) { if(config.isForceKryoEnabled()) { - return new KryoSerializer<T>(this.typeClass, config); + return new KryoSerializer<T>(getTypeClass(), config); } if(config.isForceAvroEnabled()) { - return new AvroSerializer<T>(this.typeClass); + return new AvroSerializer<T>(getTypeClass()); } TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ]; Field[] reflectiveFields = new Field[fields.length]; for (int i = 0; i < fields.length; i++) { - fieldSerializers[i] = fields[i].type.createSerializer(config); - reflectiveFields[i] = fields[i].field; + fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config); + reflectiveFields[i] = fields[i].getField(); } - return new PojoSerializer<T>(this.typeClass, fieldSerializers, reflectiveFields, config); + return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config); } - - // -------------------------------------------------------------------------------------------- @Override public boolean equals(Object obj) { - return (obj instanceof PojoTypeInfo) && ((PojoTypeInfo<?>) obj).typeClass == this.typeClass; + if (obj instanceof PojoTypeInfo) { + @SuppressWarnings("unchecked") + PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj; + + return pojoTypeInfo.canEqual(this) && + super.equals(pojoTypeInfo) && + Arrays.equals(fields, pojoTypeInfo.fields) && + totalFields == pojoTypeInfo.totalFields; + } else { + return false; + } } @Override public int hashCode() { - return typeClass.hashCode() + 1387562934; + return 31 * (31 * Arrays.hashCode(fields) + totalFields) + super.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof PojoTypeInfo; } @Override public String toString() { List<String> fieldStrings = new ArrayList<String>(); for (PojoField field : fields) { - fieldStrings.add(field.field.getName() + ": " + field.type.toString()); + fieldStrings.add(field.getField().getName() + ": " + field.getTypeInformation().toString()); } - return "PojoType<" + typeClass.getName() + return "PojoType<" + getTypeClass().getName() + ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]" + ">"; } + // -------------------------------------------------------------------------------------------- + + private class PojoTypeComparatorBuilder implements TypeComparatorBuilder<T> { + + private ArrayList<TypeComparator> fieldComparators; + private ArrayList<Field> keyFields; + + public PojoTypeComparatorBuilder() { + fieldComparators = new ArrayList<TypeComparator>(); + keyFields = new ArrayList<Field>(); + } + + + @Override + public void initializeTypeComparatorBuilder(int size) { + fieldComparators.ensureCapacity(size); + keyFields.ensureCapacity(size); + } + + @Override + public void addComparatorField(int fieldId, TypeComparator<?> comparator) { + fieldComparators.add(comparator); + keyFields.add(fields[fieldId].getField()); + } + + @Override + public TypeComparator<T> createTypeComparator(ExecutionConfig config) { + Preconditions.checkState( + keyFields.size() > 0, + "No keys were defined for the PojoTypeComparatorBuilder."); + + Preconditions.checkState( + fieldComparators.size() > 0, + "No type comparators were defined for the PojoTypeComparatorBuilder."); + + Preconditions.checkState( + keyFields.size() == fieldComparators.size(), + "Number of key fields and field comparators is not equal."); + + return new PojoComparator<T>( + keyFields.toArray(new Field[keyFields.size()]), + fieldComparators.toArray(new TypeComparator[fieldComparators.size()]), + createSerializer(config), + getTypeClass()); + } + } + public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor { private String fieldName; http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java index 9857eb6..e9ce102 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java @@ -70,12 +70,22 @@ public class RecordTypeInfo extends TypeInformation<Record> { @Override public int hashCode() { - return Record.class.hashCode() ^ 0x165667b1; + return Record.class.hashCode(); } - + + @Override + public boolean canEqual(Object obj) { + return obj instanceof RecordTypeInfo; + } + @Override public boolean equals(Object obj) { - return obj.getClass() == RecordTypeInfo.class; + if (obj instanceof RecordTypeInfo) { + RecordTypeInfo recordTypeInfo = (RecordTypeInfo) obj; + return recordTypeInfo.canEqual(this); + } else { + return false; + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java index 618b190..30710e5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -18,8 +18,12 @@ package org.apache.flink.api.java.typeutils; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -52,10 +56,13 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) { super(tupleType, types); - if (types == null || types.length > Tuple.MAX_ARITY) { - throw new IllegalArgumentException(); - } + + Preconditions.checkArgument( + types.length <= Tuple.MAX_ARITY, + "The tuple type exceeds the maximum supported arity."); + this.fieldNames = new String[types.length]; + for (int i = 0; i < types.length; i++) { fieldNames[i] = "f" + i; } @@ -78,7 +85,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { @SuppressWarnings("unchecked") @Override public TupleSerializer<T> createSerializer(ExecutionConfig executionConfig) { - if (this.tupleType == Tuple0.class) { + if (getTypeClass() == Tuple0.class) { return (TupleSerializer<T>) Tuple0Serializer.INSTANCE; } @@ -91,48 +98,65 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { return new TupleSerializer<T>(tupleClass, fieldSerializers); } - - /** - * Comparator creation - */ - private TypeComparator<?>[] fieldComparators; - private int[] logicalKeyFields; - private int comparatorHelperIndex = 0; - - @Override - protected void initializeNewComparator(int localKeyCount) { - fieldComparators = new TypeComparator<?>[localKeyCount]; - logicalKeyFields = new int[localKeyCount]; - comparatorHelperIndex = 0; - } @Override - protected void addCompareField(int fieldId, TypeComparator<?> comparator) { - fieldComparators[comparatorHelperIndex] = comparator; - logicalKeyFields[comparatorHelperIndex] = fieldId; - comparatorHelperIndex++; + protected TypeComparatorBuilder<T> createTypeComparatorBuilder() { + return new TupleTypeComparatorBuilder(); } - @Override - protected TypeComparator<T> getNewComparator(ExecutionConfig executionConfig) { - @SuppressWarnings("rawtypes") - final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex); - final int[] finalLogicalKeyFields = Arrays.copyOf(logicalKeyFields, comparatorHelperIndex); - //final TypeSerializer[] finalFieldSerializers = Arrays.copyOf(fieldSerializers, comparatorHelperIndex); - // create the serializers for the prefix up to highest key position - int maxKey = 0; - for(int key : finalLogicalKeyFields) { - maxKey = Math.max(maxKey, key); + private class TupleTypeComparatorBuilder implements TypeComparatorBuilder<T> { + + private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>(); + private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>(); + + @Override + public void initializeTypeComparatorBuilder(int size) { + fieldComparators.ensureCapacity(size); + logicalKeyFields.ensureCapacity(size); } - TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1]; - for (int i = 0; i <= maxKey; i++) { - fieldSerializers[i] = types[i].createSerializer(executionConfig); + + @Override + public void addComparatorField(int fieldId, TypeComparator<?> comparator) { + fieldComparators.add(comparator); + logicalKeyFields.add(fieldId); } - if(finalFieldComparators.length == 0 || finalLogicalKeyFields.length == 0 || fieldSerializers.length == 0 - || finalFieldComparators.length != finalLogicalKeyFields.length) { - throw new IllegalArgumentException("Tuple comparator creation has a bug"); + + @Override + public TypeComparator<T> createTypeComparator(ExecutionConfig config) { + Preconditions.checkState( + fieldComparators.size() > 0, + "No field comparators were defined for the TupleTypeComparatorBuilder." + ); + + Preconditions.checkState( + logicalKeyFields.size() > 0, + "No key fields were defined for the TupleTypeComparatorBuilder." + ); + + Preconditions.checkState( + fieldComparators.size() == logicalKeyFields.size(), + "The number of field comparators and key fields is not equal." + ); + + final int maxKey = Collections.max(logicalKeyFields); + + Preconditions.checkState( + maxKey >= 0, + "The maximum key field must be greater or equal than 0." + ); + + TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1]; + + for (int i = 0; i <= maxKey; i++) { + fieldSerializers[i] = types[i].createSerializer(config); + } + + return new TupleComparator<T>( + Ints.toArray(logicalKeyFields), + fieldComparators.toArray(new TypeComparator[fieldComparators.size()]), + fieldSerializers + ); } - return new TupleComparator<T>(finalLogicalKeyFields, finalFieldComparators, fieldSerializers); } // -------------------------------------------------------------------------------------------- @@ -142,17 +166,22 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { if (obj instanceof TupleTypeInfo) { @SuppressWarnings("unchecked") TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj; - return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) && - Arrays.deepEquals(this.types, other.types); - + return other.canEqual(this) && + super.equals(other) && + Arrays.equals(fieldNames, other.fieldNames); } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TupleTypeInfo; + } @Override public int hashCode() { - return this.types.hashCode() ^ Arrays.deepHashCode(this.types); + return 31 * super.hashCode() + Arrays.hashCode(fieldNames); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index a2d937f..469476f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; @@ -45,17 +46,20 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { protected final TypeInformation<?>[] types; - protected final Class<T> tupleType; - - private int totalFields; + private final int totalFields; public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) { super(tupleType); - this.tupleType = tupleType; - this.types = types; + + this.types = Preconditions.checkNotNull(types); + + int fieldCounter = 0; + for(TypeInformation<?> type : types) { - totalFields += type.getTotalFields(); + fieldCounter += type.getTotalFields(); } + + totalFields = fieldCounter; } @Override @@ -83,11 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { } @Override - public Class<T> getTypeClass() { - return tupleType; - } - - @Override public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); @@ -109,53 +108,49 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { } keyPosition++; } - return; - } + } else { + String fieldStr = matcher.group(1); + Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); - String fieldStr = matcher.group(1); - Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); - if (!fieldMatcher.matches()) { - throw new RuntimeException("Invalid matcher pattern"); - } - field = fieldMatcher.group(2); - int fieldPos = Integer.valueOf(field); + if (!fieldMatcher.matches()) { + throw new RuntimeException("Invalid matcher pattern"); + } - if (fieldPos >= this.getArity()) { - throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + "."); - } - TypeInformation<?> fieldType = this.getTypeAt(fieldPos); - String tail = matcher.group(5); - if(tail == null) { - if(fieldType instanceof CompositeType) { - // forward offsets - for(int i=0; i<fieldPos; i++) { - offset += this.getTypeAt(i).getTotalFields(); - } - // add all fields of composite type - ((CompositeType<?>) fieldType).getFlatFields("*", offset, result); - return; - } else { - // we found the field to add - // compute flat field position by adding skipped fields - int flatFieldPos = offset; - for(int i=0; i<fieldPos; i++) { - flatFieldPos += this.getTypeAt(i).getTotalFields(); - } - result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); - // nothing left to do - return; + field = fieldMatcher.group(2); + int fieldPos = Integer.valueOf(field); + + if (fieldPos >= this.getArity()) { + throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + "."); } - } else { - if(fieldType instanceof CompositeType<?>) { - // forward offset - for(int i=0; i<fieldPos; i++) { - offset += this.getTypeAt(i).getTotalFields(); + TypeInformation<?> fieldType = this.getTypeAt(fieldPos); + String tail = matcher.group(5); + if (tail == null) { + if (fieldType instanceof CompositeType) { + // forward offsets + for (int i = 0; i < fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + // add all fields of composite type + ((CompositeType<?>) fieldType).getFlatFields("*", offset, result); + } else { + // we found the field to add + // compute flat field position by adding skipped fields + int flatFieldPos = offset; + for (int i = 0; i < fieldPos; i++) { + flatFieldPos += this.getTypeAt(i).getTotalFields(); + } + result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); } - ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result); - // nothing left to do - return; } else { - throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); + if (fieldType instanceof CompositeType<?>) { + // forward offset + for (int i = 0; i < fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException("Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } } } } @@ -213,17 +208,24 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { if (obj instanceof TupleTypeInfoBase) { @SuppressWarnings("unchecked") TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj; - return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) && - Arrays.deepEquals(this.types, other.types); - + + return other.canEqual(this) && + super.equals(other) && + Arrays.equals(types, other.types) && + totalFields == other.totalFields; } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TupleTypeInfoBase; + } @Override public int hashCode() { - return this.types.hashCode() ^ Arrays.deepHashCode(this.types); + return 31 * (31 * super.hashCode() + Arrays.hashCode(types)) + totalFields; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 8f5d599..0196b5d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -526,12 +526,33 @@ public class TypeExtractor { } catch (ClassNotFoundException e) { throw new InvalidTypesException("Could not convert GenericArrayType to Class."); } + return getForClass(classArray); + } else { + TypeInformation<?> componentInfo = createTypeInfoWithTypeHierarchy( + typeHierarchy, + genericArray.getGenericComponentType(), + in1Type, + in2Type); + + Class<OUT> classArray; + + try { + String componentClassName = componentInfo.getTypeClass().getName(); + String resultingClassName; + + if (componentClassName.startsWith("[")) { + resultingClassName = "[" + componentClassName; + } else { + resultingClassName = "[L" + componentClassName + ";"; + } + classArray = (Class<OUT>) Class.forName(resultingClassName); + } catch (ClassNotFoundException e) { + throw new InvalidTypesException("Could not convert GenericArrayType to Class."); + } + + return ObjectArrayTypeInfo.getInfoFor(classArray, componentInfo); } - - TypeInformation<?> componentInfo = createTypeInfoWithTypeHierarchy(typeHierarchy, genericArray.getGenericComponentType(), - in1Type, in2Type); - return ObjectArrayTypeInfo.getInfoFor(t, componentInfo); } // objects with generics are treated as Class first else if (t instanceof ParameterizedType) { @@ -1188,7 +1209,13 @@ public class TypeExtractor { // object arrays else { - return ObjectArrayTypeInfo.getInfoFor(clazz); + TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy( + typeHierarchy, + clazz.getComponentType(), + in1Type, + in2Type); + + return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo); } } @@ -1481,8 +1508,8 @@ public class TypeExtractor { private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) { for (int j = 0; j < pojoInfo.getArity(); j++) { PojoField pf = ((PojoTypeInfo<?>) pojoInfo).getPojoFieldAt(j); - if (pf.field.getName().equals(field.getName())) { - return pf.type; + if (pf.getField().getName().equals(field.getName())) { + return pf.getTypeInformation(); } } return null; http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java index e61acd8..0b4823e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.AtomicType; @@ -52,17 +53,13 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement private static final long serialVersionUID = 1L; private final Class<T> type; - public ValueTypeInfo(Class<T> type) { - if (type == null) { - throw new NullPointerException(); - } - if (!Value.class.isAssignableFrom(type) && !type.equals(Value.class)) { - throw new IllegalArgumentException("ValueTypeInfo can only be used for subclasses of " + Value.class.getName()); - } - - this.type = type; + this.type = Preconditions.checkNotNull(type); + + Preconditions.checkArgument( + Value.class.isAssignableFrom(type) || type.equals(Value.class), + "ValueTypeInfo can only be used for subclasses of " + Value.class.getName()); } @Override @@ -136,17 +133,26 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement @Override public int hashCode() { - return this.type.hashCode() ^ 0xd3a2646c; + return this.type.hashCode(); } @Override public boolean equals(Object obj) { - if (obj.getClass() == ValueTypeInfo.class) { - return type == ((ValueTypeInfo<?>) obj).type; + if (obj instanceof ValueTypeInfo) { + @SuppressWarnings("unchecked") + ValueTypeInfo<T> valueTypeInfo = (ValueTypeInfo<T>) obj; + + return valueTypeInfo.canEqual(this) && + type == valueTypeInfo.type; } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof ValueTypeInfo; + } @Override public String toString() { @@ -155,7 +161,7 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement // -------------------------------------------------------------------------------------------- - static final <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) { + static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) { if (Value.class.isAssignableFrom(typeClass) && !typeClass.equals(Value.class)) { return new ValueTypeInfo<X>(typeClass); } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java index a91b888..6c140d9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.typeutils; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.AtomicType; @@ -41,13 +42,11 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp private final Class<T> typeClass; public WritableTypeInfo(Class<T> typeClass) { - if (typeClass == null) { - throw new NullPointerException(); - } - if (!Writable.class.isAssignableFrom(typeClass) || typeClass == Writable.class) { - throw new IllegalArgumentException("WritableTypeInfo can only be used for subclasses of " + Writable.class.getName()); - } - this.typeClass = typeClass; + this.typeClass = Preconditions.checkNotNull(typeClass); + + Preconditions.checkArgument( + Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class), + "WritableTypeInfo can only be used for subclasses of " + Writable.class.getName()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -104,12 +103,26 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp @Override public int hashCode() { - return typeClass.hashCode() ^ 0xd3a2646c; + return typeClass.hashCode(); } @Override public boolean equals(Object obj) { - return obj.getClass() == WritableTypeInfo.class && typeClass == ((WritableTypeInfo<?>) obj).typeClass; + if (obj instanceof WritableTypeInfo) { + @SuppressWarnings("unchecked") + WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj; + + return writableTypeInfo.canEqual(this) && + typeClass == writableTypeInfo.typeClass; + + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof WritableTypeInfo; } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java index 34dc500..26bf4ce 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java @@ -25,6 +25,7 @@ import java.io.IOException; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Preconditions; import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; @@ -69,14 +70,10 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { } public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) { - if (type == null || typeToInstantiate == null) { - throw new NullPointerException(); - } + this.type = Preconditions.checkNotNull(type); + this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); InstantiationUtil.checkForInstantiation(typeToInstantiate); - - this.type = type; - this.typeToInstantiate = typeToInstantiate; } // -------------------------------------------------------------------------------------------- @@ -192,16 +189,25 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { @Override public int hashCode() { - return 0x42fba55c + this.type.hashCode() + this.typeToInstantiate.hashCode(); + return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); } @Override public boolean equals(Object obj) { - if (obj.getClass() == AvroSerializer.class) { - AvroSerializer<?> other = (AvroSerializer<?>) obj; - return this.type == other.type && this.typeToInstantiate == other.typeToInstantiate; + if (obj instanceof AvroSerializer) { + @SuppressWarnings("unchecked") + AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj; + + return avroSerializer.canEqual(this) && + type == avroSerializer.type && + typeToInstantiate == avroSerializer.typeToInstantiate; } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof AvroSerializer; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java index 193d495..9e46f27 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime; import java.io.IOException; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -38,7 +39,7 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer public CopyableValueSerializer(Class<T> valueClass) { - this.valueClass = valueClass; + this.valueClass = Preconditions.checkNotNull(valueClass); } @Override @@ -105,16 +106,24 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer @Override public int hashCode() { - return this.valueClass.hashCode() + 9231; + return this.valueClass.hashCode(); } @Override public boolean equals(Object obj) { - if (obj.getClass() == CopyableValueSerializer.class) { - CopyableValueSerializer<?> other = (CopyableValueSerializer<?>) obj; - return this.valueClass == other.valueClass; + if (obj instanceof CopyableValueSerializer) { + @SuppressWarnings("unchecked") + CopyableValueSerializer<T> copyableValueSerializer = (CopyableValueSerializer<T>) obj; + + return copyableValueSerializer.canEqual(this) && + valueClass == copyableValueSerializer.valueClass; } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof CopyableValueSerializer; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 5d4553d..de24956 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -30,7 +30,9 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -51,33 +53,33 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private final Class<T> clazz; - private TypeSerializer<Object>[] fieldSerializers; + private final TypeSerializer<Object>[] fieldSerializers; - // We need to handle these ourselves in writeObject()/readObject() - private transient Field[] fields; - - private int numFields; + private final int numFields; - private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache; - private transient ClassLoader cl; + private final Map<Class<?>, Integer> registeredClasses; - private Map<Class<?>, Integer> registeredClasses; - - private TypeSerializer<?>[] registeredSerializers; + private final TypeSerializer<?>[] registeredSerializers; private final ExecutionConfig executionConfig; + private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache; + private transient ClassLoader cl; + // We need to handle these ourselves in writeObject()/readObject() + private transient Field[] fields; + @SuppressWarnings("unchecked") public PojoSerializer( Class<T> clazz, TypeSerializer<?>[] fieldSerializers, Field[] fields, ExecutionConfig executionConfig) { - this.clazz = clazz; - this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers; - this.fields = fields; + + this.clazz = Preconditions.checkNotNull(clazz); + this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers); + this.fields = Preconditions.checkNotNull(fields); this.numFields = fieldSerializers.length; - this.executionConfig = executionConfig; + this.executionConfig = Preconditions.checkNotNull(executionConfig); LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); @@ -563,23 +565,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { @Override public int hashCode() { - int hashCode = numFields * 47; - for (TypeSerializer<?> ser : this.fieldSerializers) { - hashCode = (hashCode << 7) | (hashCode >>> -7); - hashCode += ser.hashCode(); - } - return hashCode; + return 31 * (31 * Arrays.hashCode(fieldSerializers) + Arrays.hashCode(registeredSerializers)) + + Objects.hash(clazz, numFields, registeredClasses); } @Override public boolean equals(Object obj) { - if (obj != null && obj instanceof PojoSerializer) { - PojoSerializer<?> otherTS = (PojoSerializer<?>) obj; - return (otherTS.clazz == this.clazz) && - Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers); - } - else { + if (obj instanceof PojoSerializer) { + PojoSerializer<?> other = (PojoSerializer<?>) obj; + + return other.canEqual(this) && + clazz == other.clazz && + Arrays.equals(fieldSerializers, other.fieldSerializers) && + Arrays.equals(registeredSerializers, other.registeredSerializers) && + numFields == other.numFields && + registeredClasses.equals(other.registeredClasses); + } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof PojoSerializer; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java index 246cecf..a06ff1a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java @@ -13,6 +13,7 @@ package org.apache.flink.api.java.typeutils.runtime; import java.io.IOException; + import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.core.memory.DataInputView; @@ -94,12 +95,23 @@ public class Tuple0Serializer extends TupleSerializer<Tuple0> { @Override public int hashCode() { - return 1837461876; + return Tuple0Serializer.class.hashCode(); } @Override public boolean equals(Object obj) { - return obj == this || obj instanceof Tuple0Serializer; + if (obj instanceof Tuple0Serializer) { + Tuple0Serializer other = (Tuple0Serializer) obj; + + return other.canEqual(this); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof Tuple0Serializer; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index f041736..bf3c7a1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -18,12 +18,14 @@ package org.apache.flink.api.java.typeutils.runtime; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import java.io.IOException; import java.util.Arrays; +import java.util.Objects; public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { @@ -32,14 +34,14 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { protected final Class<T> tupleClass; - protected TypeSerializer<Object>[] fieldSerializers; + protected final TypeSerializer<Object>[] fieldSerializers; protected final int arity; @SuppressWarnings("unchecked") public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) { - this.tupleClass = tupleClass; - this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers; + this.tupleClass = Preconditions.checkNotNull(tupleClass); + this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers); this.arity = fieldSerializers.length; } @@ -74,23 +76,25 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { @Override public int hashCode() { - int hashCode = arity * 47; - for (TypeSerializer<?> ser : this.fieldSerializers) { - hashCode = (hashCode << 7) | (hashCode >>> -7); - hashCode += ser.hashCode(); - } - return hashCode; + return 31 * Arrays.hashCode(fieldSerializers) + Objects.hash(tupleClass, arity); } @Override public boolean equals(Object obj) { - if (obj != null && obj instanceof TupleSerializerBase) { - TupleSerializerBase<?> otherTS = (TupleSerializerBase<?>) obj; - return (otherTS.tupleClass == this.tupleClass) && - Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers); - } - else { + if (obj instanceof TupleSerializerBase) { + TupleSerializerBase<?> other = (TupleSerializerBase<?>) obj; + + return other.canEqual(this) && + tupleClass == other.tupleClass && + Arrays.equals(fieldSerializers, other.fieldSerializers) && + arity == other.arity; + } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TupleSerializerBase; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java index ad1b0f0..179ef19 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime; import java.io.IOException; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -47,11 +48,7 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> { // -------------------------------------------------------------------------------------------- public ValueSerializer(Class<T> type) { - if (type == null) { - throw new NullPointerException(); - } - - this.type = type; + this.type = Preconditions.checkNotNull(type); } // -------------------------------------------------------------------------------------------- @@ -126,16 +123,22 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> { @Override public int hashCode() { - return this.type.hashCode() + 17; + return this.type.hashCode(); } @Override public boolean equals(Object obj) { - if (obj.getClass() == ValueSerializer.class) { + if (obj instanceof ValueSerializer) { ValueSerializer<?> other = (ValueSerializer<?>) obj; - return this.type == other.type; + + return other.canEqual(this) && type == other.type; } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof ValueSerializer; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java index 60012ee..d854f52 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -121,16 +121,22 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> { @Override public int hashCode() { - return this.typeClass.hashCode() + 177; + return this.typeClass.hashCode(); } @Override public boolean equals(Object obj) { - if (obj.getClass() == WritableSerializer.class) { + if (obj instanceof WritableSerializer) { WritableSerializer<?> other = (WritableSerializer<?>) obj; - return this.typeClass == other.typeClass; + + return other.canEqual(this) && typeClass == other.typeClass; } else { return false; } } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof WritableSerializer; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index 1bc6771..f825fc6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -25,6 +25,7 @@ import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Preconditions; import com.twitter.chill.ScalaKryoInstantiator; import org.apache.avro.generic.GenericData; @@ -45,6 +46,7 @@ import java.lang.reflect.Modifier; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; +import java.util.Objects; /** * A type serializer that serializes its type using the Kryo serialization @@ -84,10 +86,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { // ------------------------------------------------------------------------ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ - if(type == null){ - throw new NullPointerException("Type class cannot be null."); - } - this.type = type; + this.type = Preconditions.checkNotNull(type); this.defaultSerializers = executionConfig.getDefaultKryoSerializers(); this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses(); @@ -241,19 +240,34 @@ public class KryoSerializer<T> extends TypeSerializer<T> { @Override public int hashCode() { - return type.hashCode(); + return Objects.hash( + type, + registeredTypes, + registeredTypesWithSerializerClasses, + defaultSerializerClasses); } @Override public boolean equals(Object obj) { - if (obj != null && obj instanceof KryoSerializer) { + if (obj instanceof KryoSerializer) { KryoSerializer<?> other = (KryoSerializer<?>) obj; - return other.type == this.type; + + // we cannot include the Serializers here because they don't implement the equals method + return other.canEqual(this) && + type == other.type && + registeredTypes.equals(other.registeredTypes) && + registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) && + defaultSerializerClasses.equals(other.defaultSerializerClasses); } else { return false; } } - + + @Override + public boolean canEqual(Object obj) { + return obj instanceof KryoSerializer; + } + // -------------------------------------------------------------------------------------------- private void checkKryoInitialized() { http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java index ad491d0..ebaa44d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java @@ -43,6 +43,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; public class CollectionInputFormatTest { @@ -265,8 +266,8 @@ public class CollectionInputFormatTest { private static final long serialVersionUID = 1L; - private boolean failOnRead; - private boolean failOnWrite; + private final boolean failOnRead; + private final boolean failOnWrite; public TestSerializer(boolean failOnRead, boolean failOnWrite) { this.failOnRead = failOnRead; @@ -331,5 +332,26 @@ public class CollectionInputFormatTest { public void copy(DataInputView source, DataOutputView target) throws IOException { target.writeInt(source.readInt()); } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TestSerializer) { + TestSerializer other = (TestSerializer) obj; + + return other.canEqual(this) && failOnRead == other.failOnRead && failOnWrite == other.failOnWrite; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TestSerializer; + } + + @Override + public int hashCode() { + return Objects.hash(failOnRead, failOnWrite); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java index 34fde20..96ba264 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -336,37 +336,37 @@ public class PojoTypeExtractionTest { tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false; for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) { PojoField field = pojoTypeComplexNested.getPojoFieldAt(i); - String name = field.field.getName(); + String name = field.getField().getName(); if(name.equals("date")) { if(dateSeen) { Assert.fail("already seen"); } dateSeen = true; - Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.type); - Assert.assertEquals(Date.class, field.type.getTypeClass()); + Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation()); + Assert.assertEquals(Date.class, field.getTypeInformation().getTypeClass()); } else if(name.equals("someNumberWithÃnicödeNäme")) { if(intSeen) { Assert.fail("already seen"); } intSeen = true; - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); - Assert.assertEquals(Integer.class, field.type.getTypeClass()); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); + Assert.assertEquals(Integer.class, field.getTypeInformation().getTypeClass()); } else if(name.equals("someFloat")) { if(floatSeen) { Assert.fail("already seen"); } floatSeen = true; - Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type); - Assert.assertEquals(Float.class, field.type.getTypeClass()); + Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation()); + Assert.assertEquals(Float.class, field.getTypeInformation().getTypeClass()); } else if(name.equals("word")) { if(tupleSeen) { Assert.fail("already seen"); } tupleSeen = true; - Assert.assertTrue(field.type instanceof TupleTypeInfo<?>); - Assert.assertEquals(Tuple3.class, field.type.getTypeClass()); + Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>); + Assert.assertEquals(Tuple3.class, field.getTypeInformation().getTypeClass()); // do some more advanced checks on the tuple - TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.type; + TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.getTypeInformation(); Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0)); Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1)); Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2)); @@ -375,21 +375,21 @@ public class PojoTypeExtractionTest { Assert.fail("already seen"); } objectSeen = true; - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); - Assert.assertEquals(Object.class, field.type.getTypeClass()); + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); + Assert.assertEquals(Object.class, field.getTypeInformation().getTypeClass()); } else if(name.equals("hadoopCitizen")) { if(writableSeen) { Assert.fail("already seen"); } writableSeen = true; - Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type); - Assert.assertEquals(MyWritable.class, field.type.getTypeClass()); + Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation()); + Assert.assertEquals(MyWritable.class, field.getTypeInformation().getTypeClass()); } else if(name.equals("collection")) { if(collectionSeen) { Assert.fail("already seen"); } collectionSeen = true; - Assert.assertEquals(new GenericTypeInfo(List.class), field.type); + Assert.assertEquals(new GenericTypeInfo(List.class), field.getTypeInformation()); } else { Assert.fail("field "+field+" is not expected"); @@ -428,28 +428,28 @@ public class PojoTypeExtractionTest { PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; for(int i = 0; i < pojoTypeForClass.getArity(); i++) { PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); + String name = field.getField().getName(); if(name.equals("somethingFancy")) { if(arrayListSeen) { Assert.fail("already seen"); } arrayListSeen = true; - Assert.assertTrue(field.type instanceof GenericTypeInfo); - Assert.assertEquals(ArrayList.class, field.type.getTypeClass()); + Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo); + Assert.assertEquals(ArrayList.class, field.getTypeInformation().getTypeClass()); } else if(name.equals("fancyIds")) { if(multisetSeen) { Assert.fail("already seen"); } multisetSeen = true; - Assert.assertTrue(field.type instanceof GenericTypeInfo); - Assert.assertEquals(HashMultiset.class, field.type.getTypeClass()); + Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo); + Assert.assertEquals(HashMultiset.class, field.getTypeInformation().getTypeClass()); } else if(name.equals("fancyArray")) { if(strArraySeen) { Assert.fail("already seen"); } strArraySeen = true; - Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type); - Assert.assertEquals(String[].class, field.type.getTypeClass()); + Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.getTypeInformation()); + Assert.assertEquals(String[].class, field.getTypeInformation().getTypeClass()); } else if(Arrays.asList("date", "someNumberWithÃnicödeNäme", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) { // ignore these, they are inherited from the ComplexNestedClass } @@ -479,13 +479,13 @@ public class PojoTypeExtractionTest { PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; for(int i = 0; i < pojoTypeForClass.getArity(); i++) { PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); + String name = field.getField().getName(); if(name.equals("special")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); } else if(name.equals("f0") || name.equals("f1")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); } else if(name.equals("f2")) { - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); } else { Assert.fail("unexpected field"); } @@ -499,15 +499,15 @@ public class PojoTypeExtractionTest { PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; for(int i = 0; i < pojoTypeForClass.getArity(); i++) { PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); + String name = field.getField().getName(); if(name.equals("field1")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); } else if (name.equals("field2")) { - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); } else if (name.equals("field3")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); } else { Assert.fail("Unexpected field "+field); } @@ -525,13 +525,13 @@ public class PojoTypeExtractionTest { PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; for(int i = 0; i < pojoTypeForClass.getArity(); i++) { PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); + String name = field.getField().getName(); if(name.equals("field1")) { - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); } else if (name.equals("field2")) { - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type); + Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); } else { Assert.fail("Unexpected field "+field); } @@ -546,14 +546,14 @@ public class PojoTypeExtractionTest { PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; for(int i = 0; i < pojoTypeForClass.getArity(); i++) { PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.field.getName(); + String name = field.getField().getName(); if(name.equals("field1")) { - Assert.assertTrue(field.type instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!) + Assert.assertTrue(field.getTypeInformation() instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!) } else if (name.equals("field2")) { - Assert.assertTrue(field.type instanceof TupleTypeInfo<?>); - Assert.assertTrue( ((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) ); + Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>); + Assert.assertTrue( ((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) ); } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); } else { Assert.fail("Unexpected field "+field); } @@ -641,13 +641,13 @@ public class PojoTypeExtractionTest { PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; for(int i = 0; i < pti.getArity(); i++) { PojoField field = pti.getPojoFieldAt(i); - String name = field.field.getName(); + String name = field.getField().getName(); if(name.equals("field1")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); } else if (name.equals("field2")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); } else { Assert.fail("Unexpected field "+field); } @@ -680,15 +680,15 @@ public class PojoTypeExtractionTest { PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; for(int i = 0; i < pti.getArity(); i++) { PojoField field = pti.getPojoFieldAt(i); - String name = field.field.getName(); + String name = field.getField().getName(); if(name.equals("extraField")) { - Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation()); } else if (name.equals("f0")) { - Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation()); } else if (name.equals("f1")) { - Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation()); } else if (name.equals("f2")) { - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type); + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); } else { Assert.fail("Unexpected field "+field); } @@ -831,7 +831,7 @@ public class PojoTypeExtractionTest { public void testRecursivePojo1() { TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class); Assert.assertTrue(ti instanceof PojoTypeInfo); - Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass()); + Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).getTypeInformation().getClass()); } @Test @@ -839,8 +839,8 @@ public class PojoTypeExtractionTest { TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class); Assert.assertTrue(ti instanceof PojoTypeInfo); PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0); - Assert.assertTrue(pf.type instanceof TupleTypeInfo); - Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass()); + Assert.assertTrue(pf.getTypeInformation() instanceof TupleTypeInfo); + Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.getTypeInformation()).getTypeAt(0).getClass()); } @Test @@ -848,8 +848,8 @@ public class PojoTypeExtractionTest { TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class); Assert.assertTrue(ti instanceof PojoTypeInfo); PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0); - Assert.assertTrue(pf.type instanceof PojoTypeInfo); - Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass()); + Assert.assertTrue(pf.getTypeInformation() instanceof PojoTypeInfo); + Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass()); } public static class FooBarPojo { http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index d27a82b..7f0cf5b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -66,6 +66,8 @@ import org.apache.hadoop.io.Writable; import org.junit.Assert; import org.junit.Test; +import javax.xml.bind.TypeConstraintException; + public class TypeExtractorTest { @@ -1237,7 +1239,7 @@ public class TypeExtractorTest { TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.TypeExtractorTest$CustomArrayObject[]")); Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>); - Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentType()); + Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass()); } @SuppressWarnings({ "rawtypes", "unchecked" })