http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index 9ecb9a2..56f90f4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -32,7 +32,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.Keys.FieldPositionKeys; +import org.apache.flink.api.java.operators.Keys.ExpressionKeys; +import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingCoGroupOperator; import org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingCoGroupOperator; @@ -42,6 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; + /** * A {@link DataSet} that is the result of a CoGroup transformation. * @@ -74,16 +76,16 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU // sanity check solution set key mismatches if (input1 instanceof SolutionSetPlaceHolder) { - if (keys1 instanceof FieldPositionKeys) { - int[] positions = ((FieldPositionKeys<?>) keys1).computeLogicalKeyPositions(); + if (keys1 instanceof ExpressionKeys) { + int[] positions = ((ExpressionKeys<?>) keys1).computeLogicalKeyPositions(); ((SolutionSetPlaceHolder<?>) input1).checkJoinKeyFields(positions); } else { throw new InvalidProgramException("Currently, the solution set may only be CoGrouped with using tuple field positions."); } } if (input2 instanceof SolutionSetPlaceHolder) { - if (keys2 instanceof FieldPositionKeys) { - int[] positions = ((FieldPositionKeys<?>) keys2).computeLogicalKeyPositions(); + if (keys2 instanceof ExpressionKeys) { + int[] positions = ((ExpressionKeys<?>) keys2).computeLogicalKeyPositions(); ((SolutionSetPlaceHolder<?>) input2).checkJoinKeyFields(positions); } else { throw new InvalidProgramException("Currently, the solution set may only be CoGrouped with using tuple field positions."); @@ -108,10 +110,10 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU protected org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { String name = getName() != null ? getName() : function.getClass().getName(); - - if (!keys1.areCompatibale(keys2)) { - throw new InvalidProgramException("The types of the key fields do not match. Left:" + - " " + keys1 + " Right: " + keys2); + try { + keys1.areCompatible(keys2); + } catch (IncompatibleKeysException e) { + throw new InvalidProgramException("The types of the key fields do not match.", e); } if (keys1 instanceof Keys.SelectorFunctionKeys @@ -166,15 +168,13 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU return po; } - else if ((keys1 instanceof Keys.FieldPositionKeys - && keys2 instanceof Keys.FieldPositionKeys) || - ((keys1 instanceof Keys.ExpressionKeys - && keys2 instanceof Keys.ExpressionKeys))) + else if ( keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { - - if (!keys1.areCompatibale(keys2)) { - throw new InvalidProgramException("The types of the key fields do not match."); - } + try { + keys1.areCompatible(keys2); + } catch (IncompatibleKeysException e) { + throw new InvalidProgramException("The types of the key fields do not match.", e); + } int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions(); int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions(); @@ -364,7 +364,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @see DataSet */ public CoGroupOperatorSetsPredicate where(int... fields) { - return new CoGroupOperatorSetsPredicate(new Keys.FieldPositionKeys<I1>(fields, input1.getType())); + return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType())); } /** @@ -380,9 +380,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @see Tuple * @see DataSet */ -// public CoGroupOperatorSetsPredicate where(String... fields) { -// return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType())); -// } + public CoGroupOperatorSetsPredicate where(String... fields) { + return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType())); + } /** * Continues a CoGroup transformation and defines a {@link KeySelector} function for the first co-grouped {@link DataSet}.</br> @@ -436,7 +436,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)} to finalize the CoGroup transformation. */ public CoGroupOperatorWithoutFunction equalTo(int... fields) { - return createCoGroupOperator(new Keys.FieldPositionKeys<I2>(fields, input2.getType())); + return createCoGroupOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType())); } /** @@ -448,9 +448,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @return An incomplete CoGroup transformation. * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)} to finalize the CoGroup transformation. */ -// public CoGroupOperatorWithoutFunction equalTo(String... fields) { -// return createCoGroupOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType())); -// } + public CoGroupOperatorWithoutFunction equalTo(String... fields) { + return createCoGroupOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType())); + } /** * Continues a CoGroup transformation and defines a {@link KeySelector} function for the second co-grouped {@link DataSet}.</br> @@ -480,9 +480,10 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU if (keys2.isEmpty()) { throw new InvalidProgramException("The co-group keys must not be empty."); } - - if (!keys1.areCompatibale(keys2)) { - throw new InvalidProgramException("The pair of co-group keys are not compatible with each other."); + try { + keys1.areCompatible(keys2); + } catch(IncompatibleKeysException ike) { + throw new InvalidProgramException("The pair of co-group keys are not compatible with each other.", ike); } return new CoGroupOperatorWithoutFunction(keys2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index 18cb8f6..54a65a9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -58,7 +58,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera for(int i = 0; i < tupleType.getArity(); i++) { allFields[i] = i; } - keys = new Keys.FieldPositionKeys<T>(allFields, input.getType(), true); + keys = new Keys.ExpressionKeys<T>(allFields, input.getType(), true); } else { throw new InvalidProgramException("Distinction on all fields is only possible on tuple data types."); @@ -67,7 +67,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera // FieldPositionKeys can only be applied on Tuples - if (keys instanceof Keys.FieldPositionKeys && !input.getType().isTupleType()) { + if (keys instanceof Keys.ExpressionKeys && !input.getType().isTupleType()) { throw new InvalidProgramException("Distinction on field positions is only possible on tuple data types."); } @@ -81,7 +81,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera String name = function.getClass().getName(); - if (keys instanceof Keys.FieldPositionKeys) { + if (keys instanceof Keys.ExpressionKeys) { int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getInputType(), getResultType()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index edae3c8..cbcc367 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -139,7 +139,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT return po; } - else if (grouper.getKeys() instanceof Keys.FieldPositionKeys) { + else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { //was field position key + //TODO ask stephan int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); @@ -166,19 +167,19 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT return po; } - else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { - - int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); - UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); - GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po = - new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); - - po.setCombinable(combinable); - po.setInput(input); - po.setDegreeOfParallelism(this.getParallelism()); - - return po; - } +// else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { +// +// int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); +// UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); +// GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po = +// new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); +// +// po.setCombinable(combinable); +// po.setInput(input); +// po.setDegreeOfParallelism(this.getParallelism()); +// +// return po; +// } else { throw new UnsupportedOperationException("Unrecognized key type."); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index bc35c14..caa27dc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFlatJoinFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.Operator; @@ -33,23 +34,22 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder; -import org.apache.flink.api.common.functions.RichFlatJoinFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.operators.Keys.FieldPositionKeys; +import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder; +import org.apache.flink.api.java.operators.Keys.ExpressionKeys; +import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingJoinOperator; import org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingJoinOperator; import org.apache.flink.api.java.operators.translation.PlanRightUnwrappingJoinOperator; import org.apache.flink.api.java.operators.translation.WrappingFunction; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; - //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator import org.apache.flink.api.java.tuple.*; -import org.apache.flink.util.Collector; //CHECKSTYLE.ON: AvoidStarImport +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Collector; /** * A {@link DataSet} that is the result of a Join transformation. @@ -124,16 +124,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, // sanity check solution set key mismatches if (input1 instanceof SolutionSetPlaceHolder) { - if (keys1 instanceof FieldPositionKeys) { - int[] positions = ((FieldPositionKeys<?>) keys1).computeLogicalKeyPositions(); + if (keys1 instanceof ExpressionKeys) { + int[] positions = ((ExpressionKeys<?>) keys1).computeLogicalKeyPositions(); ((SolutionSetPlaceHolder<?>) input1).checkJoinKeyFields(positions); } else { throw new InvalidProgramException("Currently, the solution set may only be joined with using tuple field positions."); } } if (input2 instanceof SolutionSetPlaceHolder) { - if (keys2 instanceof FieldPositionKeys) { - int[] positions = ((FieldPositionKeys<?>) keys2).computeLogicalKeyPositions(); + if (keys2 instanceof ExpressionKeys) { + int[] positions = ((ExpressionKeys<?>) keys2).computeLogicalKeyPositions(); ((SolutionSetPlaceHolder<?>) input2).checkJoinKeyFields(positions); } else { throw new InvalidProgramException("Currently, the solution set may only be joined with using tuple field positions."); @@ -247,12 +247,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow( Operator<I1> input1, Operator<I2> input2) { - - String name = getName() != null ? getName() : function.getClass().getName(); - if (!keys1.areCompatibale(keys2)) { - throw new InvalidProgramException("The types of the key fields do not match. Left:" + - " " + keys1 + " Right: " + keys2); + String name = getName() != null ? getName() : function.getClass().getName(); + try { + keys1.areCompatible(super.keys2); + } catch(IncompatibleKeysException ike) { + throw new InvalidProgramException("The types of the key fields do not match.", ike); } if (keys1 instanceof Keys.SelectorFunctionKeys @@ -315,10 +315,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, return po; } - else if ((super.keys1 instanceof Keys.FieldPositionKeys - && super.keys2 instanceof Keys.FieldPositionKeys) || - ((super.keys1 instanceof Keys.ExpressionKeys - && super.keys2 instanceof Keys.ExpressionKeys))) + else if (super.keys1 instanceof Keys.ExpressionKeys && super.keys2 instanceof Keys.ExpressionKeys) { // Neither side needs the tuple wrapping/unwrapping @@ -765,7 +762,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, * @see DataSet */ public JoinOperatorSetsPredicate where(int... fields) { - return new JoinOperatorSetsPredicate(new Keys.FieldPositionKeys<I1>(fields, input1.getType())); + return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType())); } /** @@ -782,9 +779,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, * @see Tuple * @see DataSet */ -// public JoinOperatorSetsPredicate where(String... fields) { -// return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType())); -// } + public JoinOperatorSetsPredicate where(String... fields) { + return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType())); + } /** * Continues a Join transformation and defines a {@link KeySelector} function for the first join {@link DataSet}.</br> @@ -843,7 +840,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, * @return A DefaultJoin that represents the joined DataSet. */ public DefaultJoin<I1, I2> equalTo(int... fields) { - return createJoinOperator(new Keys.FieldPositionKeys<I2>(fields, input2.getType())); + return createJoinOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType())); } /** @@ -857,9 +854,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, * @param fields The fields of the second join DataSet that should be used as keys. * @return A DefaultJoin that represents the joined DataSet. */ -// public DefaultJoin<I1, I2> equalTo(String... fields) { -// return createJoinOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType())); -// } + public DefaultJoin<I1, I2> equalTo(String... fields) { + return createJoinOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType())); + } /** * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br> @@ -887,8 +884,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, throw new InvalidProgramException("The join keys may not be empty."); } - if (!keys1.areCompatibale(keys2)) { - throw new InvalidProgramException("The pair of join keys are not compatible with each other."); + try { + keys1.areCompatible(keys2); + } catch (IncompatibleKeysException e) { + throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e); } return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index b19d5c3..653a04a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -18,116 +18,53 @@ package org.apache.flink.api.java.operators; +import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; -public abstract class Keys<T> { +public abstract class Keys<T> { + private static final Logger LOG = LoggerFactory.getLogger(Keys.class); public abstract int getNumberOfKeyFields(); public boolean isEmpty() { return getNumberOfKeyFields() == 0; } - - public abstract boolean areCompatibale(Keys<?> other); - + + /** + * Check if two sets of keys are compatible to each other (matching types, key counts) + */ + public abstract boolean areCompatible(Keys<?> other) throws IncompatibleKeysException; + public abstract int[] computeLogicalKeyPositions(); - + + // -------------------------------------------------------------------------------------------- - // Specializations for field indexed / expression-based / extractor-based grouping + // Specializations for expression-based / extractor-based grouping // -------------------------------------------------------------------------------------------- - - public static class FieldPositionKeys<T> extends Keys<T> { - - private final int[] fieldPositions; - private final TypeInformation<?>[] types; - - public FieldPositionKeys(int[] groupingFields, TypeInformation<T> type) { - this(groupingFields, type, false); - } - - public FieldPositionKeys(int[] groupingFields, TypeInformation<T> type, boolean allowEmpty) { - if (!type.isTupleType()) { - throw new InvalidProgramException("Specifying keys via field positions is only valid" + - "for tuple data types. Type: " + type); - } - - if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) { - throw new IllegalArgumentException("The grouping fields must not be empty."); - } - - TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>)type; - - this.fieldPositions = makeFields(groupingFields, (TupleTypeInfoBase<?>) type); - - types = new TypeInformation[this.fieldPositions.length]; - for(int i = 0; i < this.fieldPositions.length; i++) { - types[i] = tupleType.getTypeAt(this.fieldPositions[i]); - } - - } - - @Override - public int getNumberOfKeyFields() { - return this.fieldPositions.length; - } - - @Override - public boolean areCompatibale(Keys<?> other) { - - if (other instanceof FieldPositionKeys) { - FieldPositionKeys<?> oKey = (FieldPositionKeys<?>) other; - - if(oKey.types.length != this.types.length) { - return false; - } - for(int i=0; i<this.types.length; i++) { - if(!this.types[i].equals(oKey.types[i])) { - return false; - } - } - return true; - - } else if (other instanceof SelectorFunctionKeys) { - if(this.types.length != 1) { - return false; - } - - SelectorFunctionKeys<?, ?> sfk = (SelectorFunctionKeys<?, ?>) other; - - return sfk.keyType.equals(this.types[0]); - } - else { - return false; - } - } - - @Override - public int[] computeLogicalKeyPositions() { - return this.fieldPositions; - } - - @Override - public String toString() { - String fieldsString = Arrays.toString(fieldPositions); - String typesString = Arrays.toString(types); - return "Tuple position key (Fields: " + fieldsString + " Types: " + typesString + ")"; - } - } - - // -------------------------------------------------------------------------------------------- - + + public static class SelectorFunctionKeys<T, K> extends Keys<T> { private final KeySelector<T, K> keyExtractor; private final TypeInformation<K> keyType; + private final int[] logicalKeyFields; public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> inputType, TypeInformation<K> keyType) { if (keyExtractor == null) { @@ -136,6 +73,15 @@ public abstract class Keys<T> { this.keyExtractor = keyExtractor; this.keyType = keyType; + + // we have to handle a special case here: + // if the keyType is a tuple type, we need to select the full tuple with all its fields. + if(keyType.isTupleType()) { + ExpressionKeys<K> ek = new ExpressionKeys<K>(new String[] {ExpressionKeys.SELECT_ALL_CHAR}, keyType); + logicalKeyFields = ek.computeLogicalKeyPositions(); + } else { + logicalKeyFields = new int[] {0}; + } if (!this.keyType.isKeyType()) { throw new IllegalArgumentException("Invalid type of KeySelector keys"); @@ -152,35 +98,53 @@ public abstract class Keys<T> { @Override public int getNumberOfKeyFields() { - return 1; + return logicalKeyFields.length; } @Override - public boolean areCompatibale(Keys<?> other) { - + public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException { + if (other instanceof SelectorFunctionKeys) { @SuppressWarnings("unchecked") SelectorFunctionKeys<?, K> sfk = (SelectorFunctionKeys<?, K>) other; return sfk.keyType.equals(this.keyType); } - else if (other instanceof FieldPositionKeys) { - FieldPositionKeys<?> fpk = (FieldPositionKeys<?>) other; - - if(fpk.types.length != 1) { - return false; + else if (other instanceof ExpressionKeys) { + ExpressionKeys<?> expressionKeys = (ExpressionKeys<?>) other; + + if(keyType.isTupleType()) { + // special case again: + TupleTypeInfo<?> tupleKeyType = (TupleTypeInfo<?>) keyType; + List<FlatFieldDescriptor> keyTypeFields = new ArrayList<FlatFieldDescriptor>(tupleKeyType.getTotalFields()); + tupleKeyType.getKey(ExpressionKeys.SELECT_ALL_CHAR, 0, keyTypeFields); + if(expressionKeys.keyFields.size() != keyTypeFields.size()) { + throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE); + } + for(int i=0; i < expressionKeys.keyFields.size(); i++) { + if(!expressionKeys.keyFields.get(i).getType().equals(keyTypeFields.get(i).getType())) { + throw new IncompatibleKeysException(expressionKeys.keyFields.get(i).getType(), keyTypeFields.get(i).getType() ); + } + } + return true; } - - return fpk.types[0].equals(this.keyType); - } - else { - return false; + if(expressionKeys.getNumberOfKeyFields() != 1) { + throw new IncompatibleKeysException("Key selector functions are only compatible to one key"); + } + + if(expressionKeys.keyFields.get(0).getType().equals(this.keyType)) { + return true; + } else { + throw new IncompatibleKeysException(expressionKeys.keyFields.get(0).getType(), this.keyType); + } + } else { + throw new IncompatibleKeysException("The key is not compatible with "+other); } } @Override public int[] computeLogicalKeyPositions() { - return new int[] {0}; + return logicalKeyFields; } @Override @@ -188,93 +152,178 @@ public abstract class Keys<T> { return "Key function (Type: " + keyType + ")"; } } - - // -------------------------------------------------------------------------------------------- - + + + /** + * Represents (nested) field access through string and integer-based keys for Composite Types (Tuple or Pojo) + */ public static class ExpressionKeys<T> extends Keys<T> { + + public static final String SELECT_ALL_CHAR = "*"; + + /** + * Flattened fields representing keys fields + */ + private List<FlatFieldDescriptor> keyFields; + + /** + * two constructors for field-based (tuple-type) keys + */ + public ExpressionKeys(int[] groupingFields, TypeInformation<T> type) { + this(groupingFields, type, false); + } - private int[] logicalPositions; - - private final TypeInformation<?>[] types; - - @SuppressWarnings("unused") - private PojoTypeInfo<?> type; + // int-defined field + public ExpressionKeys(int[] groupingFields, TypeInformation<T> type, boolean allowEmpty) { + if (!type.isTupleType()) { + throw new InvalidProgramException("Specifying keys via field positions is only valid" + + "for tuple data types. Type: " + type); + } - public ExpressionKeys(String[] expressions, TypeInformation<T> type) { - if (!(type instanceof PojoTypeInfo<?>)) { - throw new UnsupportedOperationException("Key expressions can only be used on POJOs." + " " + - "A POJO must have a default constructor without arguments and not have readObject" + - " and/or writeObject methods. A current restriction is that it can only have nested POJOs or primitive (also boxed)" + - " fields."); + if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) { + throw new IllegalArgumentException("The grouping fields must not be empty."); } - PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) type; - this.type = pojoType; - logicalPositions = pojoType.getLogicalPositions(expressions); - types = pojoType.getTypes(expressions); - - for (int i = 0; i < logicalPositions.length; i++) { - if (logicalPositions[i] < 0) { - throw new IllegalArgumentException("Expression '" + expressions[i] + "' is not a valid key for POJO" + - " type " + type.toString() + "."); + // select all fields. Therefore, set all fields on this tuple level and let the logic handle the rest + // (makes type assignment easier). + if (groupingFields == null || groupingFields.length == 0) { + groupingFields = new int[type.getArity()]; + for (int i = 0; i < groupingFields.length; i++) { + groupingFields[i] = i; } + } else { + groupingFields = rangeCheckFields(groupingFields, type.getArity() -1); } + TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>)type; + Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point"); + + keyFields = new ArrayList<FlatFieldDescriptor>(type.getTotalFields()); + // for each key, find the field: + for(int j = 0; j < groupingFields.length; j++) { + for(int i = 0; i < type.getArity(); i++) { + TypeInformation<?> fieldType = tupleType.getTypeAt(i); + + if(groupingFields[j] == i) { // check if user set the key + int keyId = countNestedElementsBefore(tupleType, i) + i; + if(fieldType instanceof TupleTypeInfoBase) { + TupleTypeInfoBase<?> tupleFieldType = (TupleTypeInfoBase<?>) fieldType; + tupleFieldType.addAllFields(keyId, keyFields); + } else { + Preconditions.checkArgument(fieldType instanceof AtomicType, "Wrong field type"); + keyFields.add(new FlatFieldDescriptor(keyId, fieldType)); + } + + } + } + } + keyFields = removeNullElementsFromList(keyFields); } - + + private static int countNestedElementsBefore(TupleTypeInfoBase<?> tupleType, int pos) { + if( pos == 0) { + return 0; + } + int ret = 0; + for (int i = 0; i < pos; i++) { + TypeInformation<?> fieldType = tupleType.getTypeAt(i); + ret += fieldType.getTotalFields() -1; + } + return ret; + } + + public static <R> List<R> removeNullElementsFromList(List<R> in) { + List<R> elements = new ArrayList<R>(); + for(R e: in) { + if(e != null) { + elements.add(e); + } + } + return elements; + } + + /** + * Create ExpressionKeys from String-expressions + */ + public ExpressionKeys(String[] expressionsIn, TypeInformation<T> type) { + if(!(type instanceof CompositeType<?>)) { + throw new IllegalArgumentException("Type "+type+" is not a composite type. Key expressions are not supported."); + } + CompositeType<T> cType = (CompositeType<T>) type; + + String[] expressions = removeDuplicates(expressionsIn); + if(expressionsIn.length != expressions.length) { + LOG.warn("The key expressions contained duplicates. They are now unique"); + } + // extract the keys on their flat position + keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length); + for (int i = 0; i < expressions.length; i++) { + List<FlatFieldDescriptor> keys = new ArrayList<FlatFieldDescriptor>(); // use separate list to do a size check + cType.getKey(expressions[i], 0, keys); + if(keys.size() == 0) { + throw new IllegalArgumentException("Unable to extract key from expression "+expressions[i]+" on key "+cType); + } + keyFields.addAll(keys); + } + } + @Override public int getNumberOfKeyFields() { - return logicalPositions.length; + if(keyFields == null) { + return 0; + } + return keyFields.size(); } @Override - public boolean areCompatibale(Keys<?> other) { + public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException { if (other instanceof ExpressionKeys) { ExpressionKeys<?> oKey = (ExpressionKeys<?>) other; - if(oKey.types.length != this.types.length) { - return false; + if(oKey.getNumberOfKeyFields() != this.getNumberOfKeyFields() ) { + throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE); } - for(int i=0; i<this.types.length; i++) { - if(!this.types[i].equals(oKey.types[i])) { - return false; + for(int i=0; i < this.keyFields.size(); i++) { + if(!this.keyFields.get(i).getType().equals(oKey.keyFields.get(i).getType())) { + throw new IncompatibleKeysException(this.keyFields.get(i).getType(), oKey.keyFields.get(i).getType() ); } } return true; - + } else if(other instanceof SelectorFunctionKeys<?, ?>) { + return other.areCompatible(this); } else { - return false; + throw new IncompatibleKeysException("The key is not compatible with "+other); } } @Override public int[] computeLogicalKeyPositions() { - return logicalPositions; + List<Integer> logicalKeys = new LinkedList<Integer>(); + for(FlatFieldDescriptor kd : keyFields) { + logicalKeys.addAll( Ints.asList(kd.getPosition())); + } + return Ints.toArray(logicalKeys); } + + } + + private static String[] removeDuplicates(String[] in) { + List<String> ret = new LinkedList<String>(); + for(String el : in) { + if(!ret.contains(el)) { + ret.add(el); + } + } + return ret.toArray(new String[ret.size()]); } - - + // -------------------------------------------------------------------------------------------- + + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- - private static int[] makeFields(int[] fields, TupleTypeInfoBase<?> type) { - int inLength = type.getArity(); - - // null parameter means all fields are considered - if (fields == null || fields.length == 0) { - fields = new int[inLength]; - for (int i = 0; i < inLength; i++) { - fields[i] = i; - } - return fields; - } else { - return rangeCheckAndOrderFields(fields, inLength-1); - } - } - private static final int[] rangeCheckAndOrderFields(int[] fields, int maxAllowedField) { - // order - Arrays.sort(fields); + private static final int[] rangeCheckFields(int[] fields, int maxAllowedField) { // range check and duplicate eliminate int i = 1, k = 0; @@ -285,12 +334,12 @@ public abstract class Keys<T> { } for (; i < fields.length; i++) { - if (fields[i] < 0 || i > maxAllowedField) { + if (fields[i] < 0 || fields[i] > maxAllowedField) { throw new IllegalArgumentException("Tuple position is out of range."); } - if (fields[i] != last) { k++; + last = fields[i]; fields[k] = fields[i]; } } @@ -299,7 +348,20 @@ public abstract class Keys<T> { if (k == fields.length - 1) { return fields; } else { - return Arrays.copyOfRange(fields, 0, k); + return Arrays.copyOfRange(fields, 0, k+1); + } + } + + public static class IncompatibleKeysException extends Exception { + private static final long serialVersionUID = 1L; + public static final String SIZE_MISMATCH_MESSAGE = "The number of specified keys is different."; + + public IncompatibleKeysException(String message) { + super(message); + } + + public IncompatibleKeysException(TypeInformation<?> typeInformation, TypeInformation<?> typeInformation2) { + super(typeInformation+" and "+typeInformation2+" are not compatible"); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java index f0931b5..532e464 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java @@ -50,7 +50,7 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition throw new UnsupportedOperationException("Range Partitioning not yet supported"); } - if(pKeys instanceof Keys.FieldPositionKeys<?> && !input.getType().isTupleType()) { + if(pKeys instanceof Keys.ExpressionKeys<?> && !input.getType().isTupleType()) { throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple DataSets"); } @@ -83,7 +83,7 @@ public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, Partition } else if (pMethod == PartitionMethod.HASH) { - if (pKeys instanceof Keys.FieldPositionKeys) { + if (pKeys instanceof Keys.ExpressionKeys) { int[] logicalKeyPositions = pKeys.computeLogicalKeyPositions(); UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java index 463b31c..8cb64ba 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java @@ -101,8 +101,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe MapOperatorBase<?, IN, ?> po = translateSelectorFunctionReducer(selectorKeys, function, getInputType(), name, input, this.getParallelism()); return po; } - else if (grouper.getKeys() instanceof Keys.FieldPositionKeys || - grouper.getKeys() instanceof Keys.ExpressionKeys) { + else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { // reduce with field positions int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java index e862b5a..6272538 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -52,6 +52,11 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType public int getArity() { return 1; } + + @Override + public int getTotalFields() { + return 1; + } @Override public Class<T> getTypeClass() { @@ -65,6 +70,9 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType @Override public TypeSerializer<T> createSerializer() { + // NOTE: The TypeExtractor / pojo logic is assuming that we are using a Avro Serializer here + // in particular classes implementing GenericContainer are handled as GenericTypeInfos + // (this will probably not work with Kryo) return new AvroSerializer<T>(this.typeClass); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java index 226557f..55128e6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java @@ -64,6 +64,11 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { public int getArity() { return 1; } + + @Override + public int getTotalFields() { + return 1; + } @SuppressWarnings("unchecked") @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java index 1b8ef35..bf0e25a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java @@ -22,7 +22,7 @@ import java.lang.reflect.Field; import org.apache.flink.api.common.typeinfo.TypeInformation; -class PojoField { +public class PojoField { public Field field; public TypeInformation<?> type; @@ -30,4 +30,9 @@ class PojoField { this.field = field; this.type = type; } -} + + @Override + public String toString() { + return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 51ed507..fba1f24 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -18,33 +18,41 @@ package org.apache.flink.api.java.typeutils; -import com.google.common.base.Joiner; - import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.CompositeType; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.typeutils.runtime.PojoComparator; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; +import com.google.common.base.Joiner; + /** - * + * TypeInformation for arbitrary (they have to be java-beans-style) java objects (what we call POJO). + * */ -public class PojoTypeInfo<T> extends TypeInformation<T> implements CompositeType<T> { +public class PojoTypeInfo<T> extends CompositeType<T>{ private final Class<T> typeClass; private PojoField[] fields; + + private int totalFields; public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) { + super(typeClass); this.typeClass = typeClass; List<PojoField> tempFields = new ArrayList<PojoField>(fields); Collections.sort(tempFields, new Comparator<PojoField>() { @@ -54,6 +62,14 @@ public class PojoTypeInfo<T> extends TypeInformation<T> implements CompositeType } }); this.fields = tempFields.toArray(new PojoField[tempFields.size()]); + + // check if POJO is public + if(!Modifier.isPublic(typeClass.getModifiers())) { + throw new RuntimeException("POJO "+typeClass+" is not public"); + } + for(PojoField field : fields) { + totalFields += field.type.getTotalFields(); + } } @Override @@ -71,6 +87,11 @@ public class PojoTypeInfo<T> extends TypeInformation<T> implements CompositeType public int getArity() { return fields.length; } + + @Override + public int getTotalFields() { + return totalFields; + } @Override public Class<T> getTypeClass() { @@ -82,18 +103,6 @@ public class PojoTypeInfo<T> extends TypeInformation<T> implements CompositeType return Comparable.class.isAssignableFrom(typeClass); } - @Override - public TypeSerializer<T> createSerializer() { - TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length]; - Field[] reflectiveFields = new Field[fields.length]; - - for (int i = 0; i < fields.length; i++) { - fieldSerializers[i] = fields[i].type.createSerializer(); - reflectiveFields[i] = fields[i].field; - } - - return new PojoSerializer<T>(this.typeClass, fieldSerializers, reflectiveFields); - } @Override public String toString() { @@ -105,73 +114,124 @@ public class PojoTypeInfo<T> extends TypeInformation<T> implements CompositeType + ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]" + ">"; } - - public int getLogicalPosition(String fieldExpression) { - for (int i = 0; i < fields.length; i++) { - if (fields[i].field.getName().equals(fieldExpression)) { - return i; + + @Override + public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + // handle 'select all' first + if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) { + int keyPosition = 0; + for(PojoField field : fields) { + if(field.type instanceof AtomicType) { + result.add(new FlatFieldDescriptor(offset + keyPosition, field.type)); + } else if(field.type instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>)field.type; + cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); + keyPosition += cType.getTotalFields()-1; + } else { + throw new RuntimeException("Unexpected key type: "+field.type); + } + keyPosition++; } + return; + } + Validate.notEmpty(fieldExpression, "Field expression must not be empty."); + // if there is a dot try getting the field from that sub field + int firstDot = fieldExpression.indexOf('.'); + if (firstDot == -1) { + // this is the last field (or only field) in the field expression + int fieldId = 0; + for (int i = 0; i < fields.length; i++) { + if(fields[i].type instanceof CompositeType) { + fieldId += fields[i].type.getTotalFields()-1; + } + if (fields[i].field.getName().equals(fieldExpression)) { + result.add(new FlatFieldDescriptor(offset + fieldId, fields[i].type)); + return; + } + fieldId++; + } + } else { + // split and go deeper + String firstField = fieldExpression.substring(0, firstDot); + String rest = fieldExpression.substring(firstDot + 1); + int fieldId = 0; + for (int i = 0; i < fields.length; i++) { + if (fields[i].field.getName().equals(firstField)) { + if (!(fields[i].type instanceof CompositeType<?>)) { + throw new RuntimeException("Field "+fields[i].type+" is not composite type"); + } + CompositeType<?> cType = (CompositeType<?>) fields[i].type; + cType.getKey(rest, offset + fieldId, result); // recurse + return; + } + fieldId++; + } + throw new RuntimeException("Unable to find field "+fieldExpression+" in type "+this+" (looking for '"+firstField+"')"); } - return -1; } - public int[] getLogicalPositions(String[] fieldExpression) { - int[] result = new int[fieldExpression.length]; - for (int i = 0; i < fieldExpression.length; i++) { - result[i] = getLogicalPosition(fieldExpression[i]); + @Override + public <X> TypeInformation<X> getTypeAt(int pos) { + if (pos < 0 || pos >= this.fields.length) { + throw new IndexOutOfBoundsException(); } - return result; + @SuppressWarnings("unchecked") + TypeInformation<X> typed = (TypeInformation<X>) fields[pos].type; + return typed; } - public TypeInformation<?> getType(String fieldExpression) { - for (int i = 0; i < fields.length; i++) { - if (fields[i].field.getName().equals(fieldExpression)) { - return fields[i].type; - } + // used for testing. Maybe use mockito here + public PojoField getPojoFieldAt(int pos) { + if (pos < 0 || pos >= this.fields.length) { + throw new IndexOutOfBoundsException(); } - return null; + return this.fields[pos]; } - public TypeInformation<?>[] getTypes(String[] fieldExpression) { - TypeInformation<?>[] result = new TypeInformation<?>[fieldExpression.length]; - for (int i = 0; i < fieldExpression.length; i++) { - result[i] = getType(fieldExpression[i]); - } - return result; + /** + * Comparator creation + */ + private TypeComparator<?>[] fieldComparators; + private Field[] keyFields; + private int comparatorHelperIndex = 0; + @Override + protected void initializeNewComparator(int keyCount) { + fieldComparators = new TypeComparator<?>[keyCount]; + keyFields = new Field[keyCount]; + comparatorHelperIndex = 0; } @Override - public TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders) { - // sanity checks - if (logicalKeyFields == null || orders == null || logicalKeyFields.length != orders.length || - logicalKeyFields.length > fields.length) - { - throw new IllegalArgumentException(); - } + protected void addCompareField(int fieldId, TypeComparator<?> comparator) { + fieldComparators[comparatorHelperIndex] = comparator; + keyFields[comparatorHelperIndex] = fields[fieldId].field; + comparatorHelperIndex++; + } -// if (logicalKeyFields.length == 1) { -// return createSinglefieldComparator(logicalKeyFields[0], orders[0], types[logicalKeyFields[0]]); -// } + @Override + protected TypeComparator<T> getNewComparator() { + // first remove the null array fields + final Field[] finalKeyFields = Arrays.copyOf(keyFields, comparatorHelperIndex); + @SuppressWarnings("rawtypes") + final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex); + if(finalFieldComparators.length == 0 || finalKeyFields.length == 0 || finalFieldComparators.length != finalKeyFields.length) { + throw new IllegalArgumentException("Pojo comparator creation has a bug"); + } + return new PojoComparator<T>(finalKeyFields, finalFieldComparators, createSerializer(), typeClass); + } - // create the comparators for the individual fields - TypeComparator<?>[] fieldComparators = new TypeComparator<?>[logicalKeyFields.length]; - Field[] keyFields = new Field[logicalKeyFields.length]; - for (int i = 0; i < logicalKeyFields.length; i++) { - int field = logicalKeyFields[i]; + @Override + public TypeSerializer<T> createSerializer() { + TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ]; + Field[] reflectiveFields = new Field[fields.length]; - if (field < 0 || field >= fields.length) { - throw new IllegalArgumentException("The field position " + field + " is out of range [0," + fields.length + ")"); - } - if (fields[field].type.isKeyType() && fields[field].type instanceof AtomicType) { - fieldComparators[i] = ((AtomicType<?>) fields[field].type).createComparator(orders[i]); - keyFields[i] = fields[field].field; - keyFields[i].setAccessible(true); - } else { - throw new IllegalArgumentException("The field at position " + field + " (" + fields[field].type + ") is no atomic key type."); - } + for (int i = 0; i < fields.length; i++) { + fieldSerializers[i] = fields[i].type.createSerializer(); + reflectiveFields[i] = fields[i].field; } - return new PojoComparator<T>(keyFields, fieldComparators, createSerializer(), typeClass); + return new PojoSerializer<T>(this.typeClass, fieldSerializers, reflectiveFields); } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java index f069eed..2464f25 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java @@ -43,6 +43,11 @@ public class RecordTypeInfo extends TypeInformation<Record> { public int getArity() { return 1; } + + @Override + public int getTotalFields() { + return 1; + } @Override public Class<Record> getTypeClass() { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java index 6edb08c..82f9c50 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -20,22 +20,21 @@ package org.apache.flink.api.java.typeutils; import java.util.Arrays; -import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.TupleComparator; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; - //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator import org.apache.flink.api.java.tuple.*; //CHECKSTYLE.ON: AvoidStarImport +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; -public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { +public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { + @SuppressWarnings("unchecked") public TupleTypeInfo(TypeInformation<?>... types) { this((Class<T>) CLASSES[types.length - 1], types); @@ -60,59 +59,72 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { return new TupleSerializer<T>(tupleClass, fieldSerializers); } + /** + * Comparator creation + */ + private TypeSerializer<?>[] fieldSerializers; + private TypeComparator<?>[] fieldComparators; + private int[] logicalKeyFields; + private int comparatorHelperIndex = 0; + @Override - public TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders) { - // sanity checks - if (logicalKeyFields == null || orders == null || logicalKeyFields.length != orders.length || - logicalKeyFields.length > types.length) - { - throw new IllegalArgumentException(); - } + protected void initializeNewComparator(int localKeyCount) { + fieldSerializers = new TypeSerializer[localKeyCount]; + fieldComparators = new TypeComparator<?>[localKeyCount]; + logicalKeyFields = new int[localKeyCount]; + comparatorHelperIndex = 0; + } - int maxKey = -1; - for (int key : logicalKeyFields){ - maxKey = Math.max(key, maxKey); - } - - if (maxKey >= this.types.length) { - throw new IllegalArgumentException("The key position " + maxKey + " is out of range for Tuple" + types.length); - } - - // create the comparators for the individual fields - TypeComparator<?>[] fieldComparators = new TypeComparator<?>[logicalKeyFields.length]; - for (int i = 0; i < logicalKeyFields.length; i++) { - int keyPos = logicalKeyFields[i]; - if (types[keyPos].isKeyType() && types[keyPos] instanceof AtomicType) { - fieldComparators[i] = ((AtomicType<?>) types[keyPos]).createComparator(orders[i]); - } else if(types[keyPos].isTupleType() && types[keyPos] instanceof TupleTypeInfo){ // Check for tuple - TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>) types[keyPos]; - - // All fields are key - int[] allFieldsKey = new int[tupleType.types.length]; - for(int h = 0; h < tupleType.types.length; h++){ - allFieldsKey[h]=h; - } - - // Prepare order - boolean[] tupleOrders = new boolean[tupleType.types.length]; - Arrays.fill(tupleOrders, orders[i]); - fieldComparators[i] = tupleType.createComparator(allFieldsKey, tupleOrders); - } else { - throw new IllegalArgumentException("The field at position " + i + " (" + types[keyPos] + ") is no atomic key type nor tuple type."); - } - } - + @Override + protected void addCompareField(int fieldId, TypeComparator<?> comparator) { + fieldComparators[comparatorHelperIndex] = comparator; + fieldSerializers[comparatorHelperIndex] = types[fieldId].createSerializer(); + logicalKeyFields[comparatorHelperIndex] = fieldId; + comparatorHelperIndex++; + } + + @Override + protected TypeComparator<T> getNewComparator() { + @SuppressWarnings("rawtypes") + final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex); + final int[] finalLogicalKeyFields = Arrays.copyOf(logicalKeyFields, comparatorHelperIndex); + //final TypeSerializer[] finalFieldSerializers = Arrays.copyOf(fieldSerializers, comparatorHelperIndex); // create the serializers for the prefix up to highest key position + int maxKey = 0; + for(int key : finalLogicalKeyFields) { + maxKey = Math.max(maxKey, key); + } TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1]; for (int i = 0; i <= maxKey; i++) { fieldSerializers[i] = types[i].createSerializer(); } - - return new TupleComparator<T>(logicalKeyFields, fieldComparators, fieldSerializers); + if(finalFieldComparators.length == 0 || finalLogicalKeyFields.length == 0 || fieldSerializers.length == 0 + || finalFieldComparators.length != finalLogicalKeyFields.length) { + throw new IllegalArgumentException("Tuple comparator creation has a bug"); + } + return new TupleComparator<T>(finalLogicalKeyFields, finalFieldComparators, fieldSerializers); } // -------------------------------------------------------------------------------------------- - + + @Override + public boolean equals(Object obj) { + if (obj instanceof TupleTypeInfo) { + @SuppressWarnings("unchecked") + TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj; + return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) && + Arrays.deepEquals(this.types, other.types); + + } else { + return false; + } + } + + @Override + public int hashCode() { + return this.types.hashCode() ^ Arrays.deepHashCode(this.types); + } + @Override public String toString() { return "Java " + super.toString(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index 3e1b646..4babbd7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -19,19 +19,31 @@ package org.apache.flink.api.java.typeutils; import java.util.Arrays; +import java.util.List; -import org.apache.flink.api.common.typeinfo.CompositeType; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -public abstract class TupleTypeInfoBase<T> extends TypeInformation<T> implements CompositeType<T> { +import com.google.common.base.Preconditions; + +public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { protected final TypeInformation<?>[] types; protected final Class<T> tupleType; + + private int totalFields; public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) { + super(tupleType); this.tupleType = tupleType; this.types = types; + for(TypeInformation<?> type : types) { + totalFields += type.getTotalFields(); + } } @Override @@ -48,6 +60,11 @@ public abstract class TupleTypeInfoBase<T> extends TypeInformation<T> implements public int getArity() { return types.length; } + + @Override + public int getTotalFields() { + return totalFields; + } @Override public Class<T> getTypeClass() { @@ -55,6 +72,95 @@ public abstract class TupleTypeInfoBase<T> extends TypeInformation<T> implements } + /** + * Recursively add all fields in this tuple type. We need this in particular to get all + * the types. + * @param keyId + * @param keyFields + */ + public void addAllFields(int startKeyId, List<FlatFieldDescriptor> keyFields) { + for(int i = 0; i < this.getArity(); i++) { + TypeInformation<?> type = this.types[i]; + if(type instanceof AtomicType) { + keyFields.add(new FlatFieldDescriptor(startKeyId, type)); + } else if(type instanceof TupleTypeInfoBase<?>) { + TupleTypeInfoBase<?> ttb = (TupleTypeInfoBase<?>) type; + ttb.addAllFields(startKeyId, keyFields); + } + startKeyId += type.getTotalFields(); + } + } + + + @Override + public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + // handle 'select all' + if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) { + int keyPosition = 0; + for(TypeInformation<?> type : types) { + if(type instanceof AtomicType) { + result.add(new FlatFieldDescriptor(offset + keyPosition, type)); + } else if(type instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>)type; + cType.getKey(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); + keyPosition += cType.getTotalFields()-1; + } else { + throw new RuntimeException("Unexpected key type: "+type); + } + keyPosition++; + } + return; + } + // check input + if(fieldExpression.length() < 2) { + throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. The length must be at least 2"); + } + if(fieldExpression.charAt(0) != 'f') { + throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect for a Tuple type. It has to start with an 'f'"); + } + // get first component of nested expression + int dotPos = fieldExpression.indexOf('.'); + String nestedSplitFirst = fieldExpression; + if(dotPos != -1 ) { + Preconditions.checkArgument(dotPos != fieldExpression.length()-1, "The field expression can never end with a dot."); + nestedSplitFirst = fieldExpression.substring(0, dotPos); + } + String fieldNumStr = nestedSplitFirst.substring(1, nestedSplitFirst.length()); + if(!StringUtils.isNumeric(fieldNumStr)) { + throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is not numeric"); + } + int pos = -1; + try { + pos = Integer.valueOf(fieldNumStr); + } catch(NumberFormatException nfe) { + throw new IllegalArgumentException("The field expression '"+fieldExpression+"' is incorrect. Field number '"+fieldNumStr+" is not numeric", nfe); + } + if(pos < 0) { + throw new IllegalArgumentException("Negative position is not possible"); + } + // pass down the remainder (after the dot) of the fieldExpression to the type at that position. + if(dotPos != -1) { // we need to go deeper + String rem = fieldExpression.substring(dotPos+1); + if( !(types[pos] instanceof CompositeType<?>) ) { + throw new RuntimeException("Element at position "+pos+" is not a composite type. There are no nested types to select"); + } + CompositeType<?> cType = (CompositeType<?>) types[pos]; + cType.getKey(rem, offset + pos, result); + return; + } + + if(pos >= types.length) { + throw new IllegalArgumentException("The specified tuple position does not exist"); + } + + // count nested fields before "pos". + for(int i = 0; i < pos; i++) { + offset += types[i].getTotalFields() - 1; // this adds only something to offset if its a composite type. + } + + result.add(new FlatFieldDescriptor(offset + pos, types[pos])); + } + public <X> TypeInformation<X> getTypeAt(int pos) { if (pos < 0 || pos >= this.types.length) { throw new IndexOutOfBoundsException(); @@ -115,4 +221,5 @@ public abstract class TupleTypeInfoBase<T> extends TypeInformation<T> implements bld.append('>'); return bld.toString(); } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 55f6b1f..bc92cdd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.avro.generic.GenericContainer; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CrossFunction; @@ -47,13 +48,19 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; public class TypeExtractor { + private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); // We need this to detect recursive types and not get caught // in an endless recursion @@ -220,6 +227,29 @@ public class TypeExtractor { // get info from hierarchy return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); } + + + /** + * @param curT : start type + * @return Type The immediate child of the top class + */ + private Type recursivelyGetTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) { + while (!(curT instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) curT).getRawType()).equals( + stopAtClass)) + && !(curT instanceof Class<?> && ((Class<?>) curT).equals(stopAtClass))) { + typeHierarchy.add(curT); + + // parameterized type + if (curT instanceof ParameterizedType) { + curT = ((Class<?>) ((ParameterizedType) curT).getRawType()).getGenericSuperclass(); + } + // class + else { + curT = ((Class<?>) curT).getGenericSuperclass(); + } + } + return curT; + } @SuppressWarnings({ "unchecked", "rawtypes" }) private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t, @@ -227,7 +257,7 @@ public class TypeExtractor { // check if type is a subclass of tuple if ((t instanceof Class<?> && Tuple.class.isAssignableFrom((Class<?>) t)) - || (t instanceof ParameterizedType && Tuple.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType()))) { + || (t instanceof ParameterizedType && Tuple.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType()))) { Type curT = t; @@ -236,7 +266,7 @@ public class TypeExtractor { throw new InvalidTypesException( "Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead."); } - + // go up the hierarchy until we reach immediate child of Tuple (with or without generics) // collect the types while moving up for a later top-down while (!(curT instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) curT).getRawType()).getSuperclass().equals( @@ -295,15 +325,23 @@ public class TypeExtractor { } } - // TODO: Check that type that extends Tuple does not have additional fields. - // Right now, these fields are not be serialized by the TupleSerializer. - // We might want to add an ExtendedTupleSerializer for that. - + Class<?> tAsClass = null; if (t instanceof Class<?>) { - return new TupleTypeInfo(((Class<? extends Tuple>) t), tupleSubTypes); + tAsClass = (Class<?>) t; } else if (t instanceof ParameterizedType) { - return new TupleTypeInfo(((Class<? extends Tuple>) ((ParameterizedType) t).getRawType()), tupleSubTypes); + tAsClass = (Class<? extends Tuple>) ((ParameterizedType) t).getRawType(); } + Preconditions.checkNotNull(tAsClass, "t has a unexpected type"); + // check if the class we assumed to be a Tuple so far is actually a pojo because it contains additional fields. + // check for additional fields. + int fieldCount = countFieldsInClass(tAsClass); + if(fieldCount != tupleSubTypes.length) { + // the class is not a real tuple because it contains additional fields. treat as a pojo + return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>() ); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class. + } + + return new TupleTypeInfo(tAsClass, tupleSubTypes); + } // type depends on another type // e.g. class MyMapper<E> extends MapFunction<String, E> @@ -361,16 +399,29 @@ public class TypeExtractor { } // objects with generics are treated as raw type else if (t instanceof ParameterizedType) { - return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType()); + return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), new ArrayList<Type>()); // pass new type hierarchies here because + // while creating the TH here, we assumed a tuple type. } // no tuple, no TypeVariable, no generic type else if (t instanceof Class) { - return privateGetForClass((Class<OUT>) t); + return privateGetForClass((Class<OUT>) t, new ArrayList<Type>()); } throw new InvalidTypesException("Type Information could not be created."); } + private int countFieldsInClass(Class<?> clazz) { + int fieldCount = 0; + for(Field field : clazz.getFields()) { // get all fields + if( !Modifier.isStatic(field.getModifiers()) && + !Modifier.isTransient(field.getModifiers()) + ) { + fieldCount++; + } + } + return fieldCount; + } + private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy, TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) { @@ -383,7 +434,7 @@ public class TypeExtractor { else { returnTypeVar = (TypeVariable<?>) matReturnTypeVar; } - + TypeInformation<?> info = null; if (in1TypeInfo != null) { // find the deepest type variable that describes the type of input 1 @@ -806,11 +857,11 @@ public class TypeExtractor { } public static <X> TypeInformation<X> getForClass(Class<X> clazz) { - return new TypeExtractor().privateGetForClass(clazz); + return new TypeExtractor().privateGetForClass(clazz, new ArrayList<Type>()); } @SuppressWarnings("unchecked") - private <X> TypeInformation<X> privateGetForClass(Class<X> clazz) { + private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) { Validate.notNull(clazz); // check for abstract classes or interfaces @@ -819,10 +870,8 @@ public class TypeExtractor { } if (clazz.equals(Object.class)) { - // this will occur when trying to analyze POJOs that have generic, this - // exception will be caught and a GenericTypeInfo will be created for the type. - // at some point we might support this using Kryo - throw new InvalidTypesException("Object is not a valid type."); + // TODO (merging): better throw an exception here. the runtime does not support it yet + return new GenericTypeInfo<X>(clazz); } // check for arrays @@ -879,62 +928,135 @@ public class TypeExtractor { // special case handling for Class, this should not be handled by the POJO logic return new GenericTypeInfo<X>(clazz); } + if(GenericContainer.class.isAssignableFrom(clazz)) { + // this is a type generated by Avro. GenericTypeInfo is able to handle this case because its using Avro. + return new GenericTypeInfo<X>(clazz); + } + TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy); + if (pojoType != null) { + return pojoType; + } -// Disable POJO types for now (see https://mail-archives.apache.org/mod_mbox/incubator-flink-dev/201407.mbox/%3C53D96049.1060509%40cse.uta.edu%3E) -// -// TypeInformation<X> pojoType = analyzePojo(clazz); -// if (pojoType != null) { -// return pojoType; -// } // return a generic type return new GenericTypeInfo<X>(clazz); } + + /** + * Checks if the given field is a valid pojo field: + * - it is public + * OR + * - there are getter and setter methods for the field. + * + * @param f field to check + * @param clazz class of field + * @param typeHierarchy type hierarchy for materializing generic types + * @return + */ + private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHierarchy) { + if(Modifier.isPublic(f.getModifiers())) { + return true; + } else { + boolean hasGetter = false, hasSetter = false; + final String fieldNameLow = f.getName().toLowerCase(); + + Type fieldType = f.getGenericType(); + TypeVariable<?> fieldTypeGeneric = null; + if(fieldType instanceof TypeVariable) { + fieldTypeGeneric = (TypeVariable<?>) fieldType; + fieldType = materializeTypeVariable(typeHierarchy, (TypeVariable<?>)fieldType); + } + for(Method m : clazz.getMethods()) { + // check for getter + + if( // The name should be "get<FieldName>". + m.getName().toLowerCase().contains("get"+fieldNameLow) && + // no arguments for the getter + m.getParameterTypes().length == 0 && + // return type is same as field type (or the generic variant of it) + m.getReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric) ) + ) { + if(hasGetter) { + throw new IllegalStateException("Detected more than one getters"); + } + hasGetter = true; + } + // check for setters + if( m.getName().toLowerCase().contains("set"+fieldNameLow) && + m.getParameterTypes().length == 1 && // one parameter of the field's type + ( m.getParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&& + // return type is void. + m.getReturnType().equals(Void.TYPE) + ) { + if(hasSetter) { + throw new IllegalStateException("Detected more than one getters"); + } + hasSetter = true; + } + } + if( hasGetter && hasSetter) { + return true; + } else { + if(!hasGetter) { + LOG.warn("Class "+clazz+" does not contain a getter for field "+f.getName() ); + } + if(!hasSetter) { + LOG.warn("Class "+clazz+" does not contain a setter for field "+f.getName() ); + } + return false; + } + } + } - @SuppressWarnings("unused") - private <X> TypeInformation<X> analyzePojo(Class<X> clazz) { - List<Field> fields = getAllDeclaredFields(clazz); + private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy) { + // try to create Type hierarchy, if the incoming one is empty. + if(typeHierarchy.size() == 0) { + recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class); + } + + List<Field> fields = removeNonObjectFields(getAllDeclaredFields(clazz)); List<PojoField> pojoFields = new ArrayList<PojoField>(); for (Field field : fields) { + Type fieldType = field.getGenericType(); + if(!isValidPojoField(field, clazz, typeHierarchy)) { + LOG.warn("Class "+clazz+" is not a valid POJO type"); + return null; + } try { - if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) { - pojoFields.add(new PojoField(field, privateCreateTypeInfo(field.getType()))); - } + typeHierarchy.add(fieldType); + pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) )); } catch (InvalidTypesException e) { - // If some of the fields cannot be analyzed, just return a generic type info - // right now this happens when a field is an interface (collections are the prominent case here) or - // when the POJO is generic, in which case the fields will have type Object. - // We might fix that in the future when we use Kryo. - return new GenericTypeInfo<X>(clazz); + //pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this + throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+"" + + "\nThe system is using the Avro serializer which is not able to handle all types.", e); } } - PojoTypeInfo<X> pojoType = new PojoTypeInfo<X>(clazz, pojoFields); + CompositeType<X> pojoType = new PojoTypeInfo<X>(clazz, pojoFields); + // + // Validate the correctness of the pojo. + // returning "null" will result create a generic type information. + // List<Method> methods = getAllDeclaredMethods(clazz); - boolean containsReadObjectOrWriteObject = false; for (Method method : methods) { if (method.getName().equals("readObject") || method.getName().equals("writeObject")) { - containsReadObjectOrWriteObject = true; - break; + LOG.warn("Class "+clazz+" contains custom serialization methods we do not call."); + return null; } } // Try retrieving the default constructor, if it does not have one // we cannot use this because the serializer uses it. - boolean hasDefaultCtor = true; try { clazz.getDeclaredConstructor(); } catch (NoSuchMethodException e) { - hasDefaultCtor = false; + LOG.warn("Class "+clazz+" does not have a default constructor. You can not use it as a POJO"); + return null; } - - - if (!containsReadObjectOrWriteObject && hasDefaultCtor) { - return pojoType; - } - - return null; + + // everything is checked, we return the pojo + return pojoType; } // recursively determine all declared fields @@ -950,6 +1072,19 @@ public class TypeExtractor { return result; } + /** + * Remove transient and static fields from a list of fields. + */ + private static List<Field> removeNonObjectFields(List<Field> fields) { + List<Field> result = new ArrayList<Field>(); + for(Field field: fields) { + if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) { + result.add(field); + } + } + return result; + } + // recursively determine all declared methods private static List<Method> getAllDeclaredMethods(Class<?> clazz) { List<Method> result = new ArrayList<Method>(); @@ -976,6 +1111,11 @@ public class TypeExtractor { if (value instanceof Tuple) { Tuple t = (Tuple) value; int numFields = t.getArity(); + if(numFields != countFieldsInClass(value.getClass())) { + // not a tuple since it has more fields. + return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>()); // we immediately call analyze Pojo here, because + // there is currently no other type that can handle such a class. + } TypeInformation<?>[] infos = new TypeInformation[numFields]; for (int i = 0; i < numFields; i++) { @@ -988,10 +1128,9 @@ public class TypeExtractor { infos[i] = privateGetForObject(field); } - return (TypeInformation<X>) new TupleTypeInfo(value.getClass(), infos); } else { - return privateGetForClass((Class<X>) value.getClass()); + return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>()); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java index 6894e5a..953b69c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java @@ -87,7 +87,7 @@ public class TypeInfoParser { } return (TypeInformation<X>) parse(new StringBuilder(clearedString)); } catch (Exception e) { - throw new IllegalArgumentException("String could not be parsed: " + e.getMessage()); + throw new IllegalArgumentException("String could not be parsed: " + e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java index 5045b38..b3c25e4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java @@ -53,6 +53,11 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement } @Override + public int getTotalFields() { + return 1; + } + + @Override public Class<T> getTypeClass() { return this.type; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java index 19bcf0b..8c9e948 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java @@ -67,6 +67,11 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp public int getArity() { return 1; } + + @Override + public int getTotalFields() { + return 1; + } @Override public Class<T> getTypeClass() { @@ -88,6 +93,20 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp return "WritableType<" + typeClass.getName() + ">"; } + @Override + public int hashCode() { + return typeClass.hashCode() ^ 0xd3a2646c; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == WritableTypeInfo.class) { + return typeClass == ((WritableTypeInfo<?>) obj).typeClass; + } else { + return false; + } + } + // -------------------------------------------------------------------------------------------- static final <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java index 0b7890f..9b3b191 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java @@ -43,8 +43,6 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>> private transient T tempReference; - private final Comparable<?>[] extractedKey = new Comparable[1]; - private final TypeComparator<?>[] comparators = new TypeComparator[] {this}; public CopyableValueComparator(boolean ascending, Class<T> type) { @@ -126,13 +124,13 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>> } @Override - public Object[] extractKeys(T record) { - extractedKey[0] = record; - return extractedKey; + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; } @Override - public TypeComparator<?>[] getComparators() { + public TypeComparator<?>[] getFlatComparators() { return comparators; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java index 66cbdf4..2d3ce39 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java @@ -53,9 +53,6 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat private transient Kryo kryo; @SuppressWarnings("rawtypes") - private final Comparable[] extractedKey = new Comparable[1]; - - @SuppressWarnings("rawtypes") private final TypeComparator[] comparators = new TypeComparator[] {this}; // ------------------------------------------------------------------------ @@ -171,14 +168,14 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat } @Override - public Object[] extractKeys(T record) { - extractedKey[0] = record; - return extractedKey; + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; } - @Override @SuppressWarnings("rawtypes") - public TypeComparator[] getComparators() { + @Override + public TypeComparator[] getFlatComparators() { return comparators; }
