http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java deleted file mode 100644 index f7e4e42..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; -import org.apache.flink.api.java.typeutils.runtime.PojoComparator; -import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; - -import com.google.common.base.Joiner; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; - -/** - * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs, - * since the conditions are slightly different from Java Beans. - * A type is considered a FLink POJO type, if it fulfills the conditions below. - * <ul> - * <li>It is a public class, and standalone (not a non-static inner class)</li> - * <li>It has a public no-argument constructor.</li> - * <li>All fields are either public, or have public getters and setters.</li> - * </ul> - * - * @param <T> The type represented by this type information. - */ -public class PojoTypeInfo<T> extends CompositeType<T> { - - private static final long serialVersionUID = 1L; - - private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; - private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; - private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS - +"|\\"+ExpressionKeys.SELECT_ALL_CHAR - +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; - - private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); - private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); - - private final PojoField[] fields; - - private final int totalFields; - - public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) { - super(typeClass); - - Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()), - "POJO " + typeClass + " is not public"); - - this.fields = fields.toArray(new PojoField[fields.size()]); - - Arrays.sort(this.fields, new Comparator<PojoField>() { - @Override - public int compare(PojoField o1, PojoField o2) { - return o1.getField().getName().compareTo(o2.getField().getName()); - } - }); - - int counterFields = 0; - - for(PojoField field : fields) { - counterFields += field.getTypeInformation().getTotalFields(); - } - - totalFields = counterFields; - } - - @Override - public boolean isBasicType() { - return false; - } - - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return fields.length; - } - - @Override - public int getTotalFields() { - return totalFields; - } - - @Override - public boolean isSortKeyType() { - // Support for sorting POJOs that implement Comparable is not implemented yet. - // Since the order of fields in a POJO type is not well defined, sorting on fields - // gives only some undefined order. - return false; - } - - - @Override - public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { - - Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); - if(!matcher.matches()) { - throw new InvalidFieldReferenceException("Invalid POJO field reference \""+fieldExpression+"\"."); - } - - String field = matcher.group(0); - if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { - // handle select all - int keyPosition = 0; - for(PojoField pField : fields) { - if(pField.getTypeInformation() instanceof CompositeType) { - CompositeType<?> cType = (CompositeType<?>)pField.getTypeInformation(); - cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); - keyPosition += cType.getTotalFields()-1; - } else { - result.add( - new NamedFlatFieldDescriptor( - pField.getField().getName(), - offset + keyPosition, - pField.getTypeInformation())); - } - keyPosition++; - } - return; - } else { - field = matcher.group(1); - } - - // get field - int fieldPos = -1; - TypeInformation<?> fieldType = null; - for (int i = 0; i < fields.length; i++) { - if (fields[i].getField().getName().equals(field)) { - fieldPos = i; - fieldType = fields[i].getTypeInformation(); - break; - } - } - if (fieldPos == -1) { - throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+"."); - } - String tail = matcher.group(3); - if(tail == null) { - if(fieldType instanceof CompositeType) { - // forward offset - for(int i=0; i<fieldPos; i++) { - offset += this.getTypeAt(i).getTotalFields(); - } - // add all fields of composite type - ((CompositeType<?>) fieldType).getFlatFields("*", offset, result); - } else { - // we found the field to add - // compute flat field position by adding skipped fields - int flatFieldPos = offset; - for(int i=0; i<fieldPos; i++) { - flatFieldPos += this.getTypeAt(i).getTotalFields(); - } - result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); - } - } else { - if(fieldType instanceof CompositeType<?>) { - // forward offset - for(int i=0; i<fieldPos; i++) { - offset += this.getTypeAt(i).getTotalFields(); - } - ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result); - } else { - throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); - } - } - } - - @SuppressWarnings("unchecked") - @Override - public <X> TypeInformation<X> getTypeAt(String fieldExpression) { - - Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); - if(!matcher.matches()) { - if (fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { - throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); - } else { - throw new InvalidFieldReferenceException("Invalid format of POJO field expression \""+fieldExpression+"\"."); - } - } - - String field = matcher.group(1); - // get field - int fieldPos = -1; - TypeInformation<?> fieldType = null; - for (int i = 0; i < fields.length; i++) { - if (fields[i].getField().getName().equals(field)) { - fieldPos = i; - fieldType = fields[i].getTypeInformation(); - break; - } - } - if (fieldPos == -1) { - throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+"."); - } - - String tail = matcher.group(3); - if(tail == null) { - // we found the type - return (TypeInformation<X>) fieldType; - } else { - if(fieldType instanceof CompositeType<?>) { - return ((CompositeType<?>) fieldType).getTypeAt(tail); - } else { - throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); - } - } - } - - @Override - public <X> TypeInformation<X> getTypeAt(int pos) { - if (pos < 0 || pos >= this.fields.length) { - throw new IndexOutOfBoundsException(); - } - @SuppressWarnings("unchecked") - TypeInformation<X> typed = (TypeInformation<X>) fields[pos].getTypeInformation(); - return typed; - } - - @Override - protected TypeComparatorBuilder<T> createTypeComparatorBuilder() { - return new PojoTypeComparatorBuilder(); - } - - // used for testing. Maybe use mockito here - public PojoField getPojoFieldAt(int pos) { - if (pos < 0 || pos >= this.fields.length) { - throw new IndexOutOfBoundsException(); - } - return this.fields[pos]; - } - - public String[] getFieldNames() { - String[] result = new String[fields.length]; - for (int i = 0; i < fields.length; i++) { - result[i] = fields[i].getField().getName(); - } - return result; - } - - @Override - public int getFieldIndex(String fieldName) { - for (int i = 0; i < fields.length; i++) { - if (fields[i].getField().getName().equals(fieldName)) { - return i; - } - } - return -1; - } - - @Override - public TypeSerializer<T> createSerializer(ExecutionConfig config) { - if(config.isForceKryoEnabled()) { - return new KryoSerializer<T>(getTypeClass(), config); - } - if(config.isForceAvroEnabled()) { - return new AvroSerializer<T>(getTypeClass()); - } - - TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ]; - Field[] reflectiveFields = new Field[fields.length]; - - for (int i = 0; i < fields.length; i++) { - fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config); - reflectiveFields[i] = fields[i].getField(); - } - - return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PojoTypeInfo) { - @SuppressWarnings("unchecked") - PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj; - - return pojoTypeInfo.canEqual(this) && - super.equals(pojoTypeInfo) && - Arrays.equals(fields, pojoTypeInfo.fields) && - totalFields == pojoTypeInfo.totalFields; - } else { - return false; - } - } - - @Override - public int hashCode() { - return 31 * (31 * Arrays.hashCode(fields) + totalFields) + super.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof PojoTypeInfo; - } - - @Override - public String toString() { - List<String> fieldStrings = new ArrayList<String>(); - for (PojoField field : fields) { - fieldStrings.add(field.getField().getName() + ": " + field.getTypeInformation().toString()); - } - return "PojoType<" + getTypeClass().getName() - + ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]" - + ">"; - } - - // -------------------------------------------------------------------------------------------- - - private class PojoTypeComparatorBuilder implements TypeComparatorBuilder<T> { - - private ArrayList<TypeComparator> fieldComparators; - private ArrayList<Field> keyFields; - - public PojoTypeComparatorBuilder() { - fieldComparators = new ArrayList<TypeComparator>(); - keyFields = new ArrayList<Field>(); - } - - - @Override - public void initializeTypeComparatorBuilder(int size) { - fieldComparators.ensureCapacity(size); - keyFields.ensureCapacity(size); - } - - @Override - public void addComparatorField(int fieldId, TypeComparator<?> comparator) { - fieldComparators.add(comparator); - keyFields.add(fields[fieldId].getField()); - } - - @Override - public TypeComparator<T> createTypeComparator(ExecutionConfig config) { - Preconditions.checkState( - keyFields.size() > 0, - "No keys were defined for the PojoTypeComparatorBuilder."); - - Preconditions.checkState( - fieldComparators.size() > 0, - "No type comparators were defined for the PojoTypeComparatorBuilder."); - - Preconditions.checkState( - keyFields.size() == fieldComparators.size(), - "Number of key fields and field comparators is not equal."); - - return new PojoComparator<T>( - keyFields.toArray(new Field[keyFields.size()]), - fieldComparators.toArray(new TypeComparator[fieldComparators.size()]), - createSerializer(config), - getTypeClass()); - } - } - - public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor { - - private String fieldName; - - public NamedFlatFieldDescriptor(String name, int keyPosition, TypeInformation<?> type) { - super(keyPosition, type); - this.fieldName = name; - } - - public String getFieldName() { - return fieldName; - } - - @Override - public String toString() { - return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java deleted file mode 100644 index 22a18f1..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.api.common.typeinfo.TypeInformation; - -/** - * This interface can be implemented by functions and input formats to tell the framework - * about their produced data type. This method acts as an alternative to the reflection analysis - * that is otherwise performed and is useful in situations where the produced data type may vary - * depending on parameterization. - */ -public interface ResultTypeQueryable<T> { - - /** - * Gets the data type (as a {@link TypeInformation}) produced by this function or input format. - * - * @return The data type produced by this function or input format. - */ - TypeInformation<T> getProducedType(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java deleted file mode 100644 index 30710e5..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.*; -import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer; -//CHECKSTYLE.ON: AvoidStarImport -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; -import org.apache.flink.types.Value; - -/** - * A {@link TypeInformation} for the tuple types of the Java API. - * - * @param <T> The type of the tuple. - */ -public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { - - private static final long serialVersionUID = 1L; - - protected final String[] fieldNames; - - @SuppressWarnings("unchecked") - public TupleTypeInfo(TypeInformation<?>... types) { - this((Class<T>) Tuple.getTupleClass(types.length), types); - } - - public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) { - super(tupleType, types); - - Preconditions.checkArgument( - types.length <= Tuple.MAX_ARITY, - "The tuple type exceeds the maximum supported arity."); - - this.fieldNames = new String[types.length]; - - for (int i = 0; i < types.length; i++) { - fieldNames[i] = "f" + i; - } - } - - @Override - public String[] getFieldNames() { - return fieldNames; - } - - @Override - public int getFieldIndex(String fieldName) { - int fieldIndex = Integer.parseInt(fieldName.substring(1)); - if (fieldIndex >= getArity()) { - return -1; - } - return fieldIndex; - } - - @SuppressWarnings("unchecked") - @Override - public TupleSerializer<T> createSerializer(ExecutionConfig executionConfig) { - if (getTypeClass() == Tuple0.class) { - return (TupleSerializer<T>) Tuple0Serializer.INSTANCE; - } - - TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[getArity()]; - for (int i = 0; i < types.length; i++) { - fieldSerializers[i] = types[i].createSerializer(executionConfig); - } - - Class<T> tupleClass = getTypeClass(); - - return new TupleSerializer<T>(tupleClass, fieldSerializers); - } - - @Override - protected TypeComparatorBuilder<T> createTypeComparatorBuilder() { - return new TupleTypeComparatorBuilder(); - } - - private class TupleTypeComparatorBuilder implements TypeComparatorBuilder<T> { - - private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>(); - private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>(); - - @Override - public void initializeTypeComparatorBuilder(int size) { - fieldComparators.ensureCapacity(size); - logicalKeyFields.ensureCapacity(size); - } - - @Override - public void addComparatorField(int fieldId, TypeComparator<?> comparator) { - fieldComparators.add(comparator); - logicalKeyFields.add(fieldId); - } - - @Override - public TypeComparator<T> createTypeComparator(ExecutionConfig config) { - Preconditions.checkState( - fieldComparators.size() > 0, - "No field comparators were defined for the TupleTypeComparatorBuilder." - ); - - Preconditions.checkState( - logicalKeyFields.size() > 0, - "No key fields were defined for the TupleTypeComparatorBuilder." - ); - - Preconditions.checkState( - fieldComparators.size() == logicalKeyFields.size(), - "The number of field comparators and key fields is not equal." - ); - - final int maxKey = Collections.max(logicalKeyFields); - - Preconditions.checkState( - maxKey >= 0, - "The maximum key field must be greater or equal than 0." - ); - - TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1]; - - for (int i = 0; i <= maxKey; i++) { - fieldSerializers[i] = types[i].createSerializer(config); - } - - return new TupleComparator<T>( - Ints.toArray(logicalKeyFields), - fieldComparators.toArray(new TypeComparator[fieldComparators.size()]), - fieldSerializers - ); - } - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean equals(Object obj) { - if (obj instanceof TupleTypeInfo) { - @SuppressWarnings("unchecked") - TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj; - return other.canEqual(this) && - super.equals(other) && - Arrays.equals(fieldNames, other.fieldNames); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof TupleTypeInfo; - } - - @Override - public int hashCode() { - return 31 * super.hashCode() + Arrays.hashCode(fieldNames); - } - - @Override - public String toString() { - return "Java " + super.toString(); - } - - // -------------------------------------------------------------------------------------------- - - public static <X extends Tuple> TupleTypeInfo<X> getBasicTupleTypeInfo(Class<?>... basicTypes) { - if (basicTypes == null || basicTypes.length == 0) { - throw new IllegalArgumentException(); - } - - TypeInformation<?>[] infos = new TypeInformation<?>[basicTypes.length]; - for (int i = 0; i < infos.length; i++) { - Class<?> type = basicTypes[i]; - if (type == null) { - throw new IllegalArgumentException("Type at position " + i + " is null."); - } - - TypeInformation<?> info = BasicTypeInfo.getInfoFor(type); - if (info == null) { - throw new IllegalArgumentException("Type at position " + i + " is not a basic type."); - } - infos[i] = info; - } - - @SuppressWarnings("unchecked") - TupleTypeInfo<X> tupleInfo = (TupleTypeInfo<X>) new TupleTypeInfo<Tuple>(infos); - return tupleInfo; - } - - @SuppressWarnings("unchecked") - public static <X extends Tuple> TupleTypeInfo<X> getBasicAndBasicValueTupleTypeInfo(Class<?>... basicTypes) { - if (basicTypes == null || basicTypes.length == 0) { - throw new IllegalArgumentException(); - } - - TypeInformation<?>[] infos = new TypeInformation<?>[basicTypes.length]; - for (int i = 0; i < infos.length; i++) { - Class<?> type = basicTypes[i]; - if (type == null) { - throw new IllegalArgumentException("Type at position " + i + " is null."); - } - - TypeInformation<?> info = BasicTypeInfo.getInfoFor(type); - if (info == null) { - try { - info = ValueTypeInfo.getValueTypeInfo((Class<Value>) type); - if (!((ValueTypeInfo<?>) info).isBasicValueType()) { - throw new IllegalArgumentException("Type at position " + i + " is not a basic or value type."); - } - } catch (ClassCastException | InvalidTypesException e) { - throw new IllegalArgumentException("Type at position " + i + " is not a basic or value type.", e); - } - } - infos[i] = info; - } - - - return (TupleTypeInfo<X>) new TupleTypeInfo<>(infos); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java deleted file mode 100644 index 469476f..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.util.Arrays; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; - -public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { - - private static final long serialVersionUID = 1L; - - private final static String REGEX_FIELD = "(f?)([0-9]+)"; - private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; - private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS - +"|\\"+ExpressionKeys.SELECT_ALL_CHAR - +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; - - private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_FIELD); - private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); - private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); - - // -------------------------------------------------------------------------------------------- - - protected final TypeInformation<?>[] types; - - private final int totalFields; - - public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) { - super(tupleType); - - this.types = Preconditions.checkNotNull(types); - - int fieldCounter = 0; - - for(TypeInformation<?> type : types) { - fieldCounter += type.getTotalFields(); - } - - totalFields = fieldCounter; - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return true; - } - - public boolean isCaseClass() { - return false; - } - - @Override - public int getArity() { - return types.length; - } - - @Override - public int getTotalFields() { - return totalFields; - } - - @Override - public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { - - Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); - if (!matcher.matches()) { - throw new InvalidFieldReferenceException("Invalid tuple field reference \""+fieldExpression+"\"."); - } - - String field = matcher.group(0); - if (field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { - // handle select all - int keyPosition = 0; - for (TypeInformation<?> type : types) { - if (type instanceof CompositeType) { - CompositeType<?> cType = (CompositeType<?>) type; - cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); - keyPosition += cType.getTotalFields() - 1; - } else { - result.add(new FlatFieldDescriptor(offset + keyPosition, type)); - } - keyPosition++; - } - } else { - String fieldStr = matcher.group(1); - Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); - - if (!fieldMatcher.matches()) { - throw new RuntimeException("Invalid matcher pattern"); - } - - field = fieldMatcher.group(2); - int fieldPos = Integer.valueOf(field); - - if (fieldPos >= this.getArity()) { - throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + "."); - } - TypeInformation<?> fieldType = this.getTypeAt(fieldPos); - String tail = matcher.group(5); - if (tail == null) { - if (fieldType instanceof CompositeType) { - // forward offsets - for (int i = 0; i < fieldPos; i++) { - offset += this.getTypeAt(i).getTotalFields(); - } - // add all fields of composite type - ((CompositeType<?>) fieldType).getFlatFields("*", offset, result); - } else { - // we found the field to add - // compute flat field position by adding skipped fields - int flatFieldPos = offset; - for (int i = 0; i < fieldPos; i++) { - flatFieldPos += this.getTypeAt(i).getTotalFields(); - } - result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); - } - } else { - if (fieldType instanceof CompositeType<?>) { - // forward offset - for (int i = 0; i < fieldPos; i++) { - offset += this.getTypeAt(i).getTotalFields(); - } - ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result); - } else { - throw new InvalidFieldReferenceException("Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); - } - } - } - } - - @Override - public <X> TypeInformation<X> getTypeAt(String fieldExpression) { - - Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); - if(!matcher.matches()) { - if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { - throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); - } else { - throw new InvalidFieldReferenceException("Invalid format of tuple field expression \""+fieldExpression+"\"."); - } - } - - String fieldStr = matcher.group(1); - Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr); - if(!fieldMatcher.matches()) { - throw new RuntimeException("Invalid matcher pattern"); - } - String field = fieldMatcher.group(2); - int fieldPos = Integer.valueOf(field); - - if(fieldPos >= this.getArity()) { - throw new InvalidFieldReferenceException("Tuple field expression \""+fieldStr+"\" out of bounds of "+this.toString()+"."); - } - TypeInformation<X> fieldType = this.getTypeAt(fieldPos); - String tail = matcher.group(5); - if(tail == null) { - // we found the type - return fieldType; - } else { - if(fieldType instanceof CompositeType<?>) { - return ((CompositeType<?>) fieldType).getTypeAt(tail); - } else { - throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); - } - } - } - - @Override - public <X> TypeInformation<X> getTypeAt(int pos) { - if (pos < 0 || pos >= this.types.length) { - throw new IndexOutOfBoundsException(); - } - - @SuppressWarnings("unchecked") - TypeInformation<X> typed = (TypeInformation<X>) this.types[pos]; - return typed; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TupleTypeInfoBase) { - @SuppressWarnings("unchecked") - TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj; - - return other.canEqual(this) && - super.equals(other) && - Arrays.equals(types, other.types) && - totalFields == other.totalFields; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof TupleTypeInfoBase; - } - - @Override - public int hashCode() { - return 31 * (31 * super.hashCode() + Arrays.hashCode(types)) + totalFields; - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder("Tuple"); - bld.append(types.length); - if (types.length > 0) { - bld.append('<').append(types[0]); - - for (int i = 1; i < types.length; i++) { - bld.append(", ").append(types[i]); - } - - bld.append('>'); - } - return bld.toString(); - } - - @Override - public boolean hasDeterministicFieldOrder() { - return true; - } -}