Various fixes - improved error messages - the composite vs atomic bug aljoscha found - Comparator test for Pojo Comparator enabled - TODO removed - string-based key expression for group sorting fields definition - support for specifying "select all" using * and now also _ (for scala fans) - Exception if user is having multiple fields with the same name in the class
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fd0be2ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fd0be2ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fd0be2ff Branch: refs/heads/master Commit: fd0be2ff13494ee3739f77b1a4368b5644c55cb8 Parents: 6b493fb Author: Robert Metzger <[email protected]> Authored: Mon Oct 6 17:11:37 2014 +0200 Committer: Robert Metzger <[email protected]> Committed: Wed Oct 8 11:39:01 2014 +0200 ---------------------------------------------------------------------- .../api/common/typeutils/CompositeType.java | 2 +- .../examples/java/wordcount/PojoExample.java | 2 +- .../api/java/operators/GroupReduceOperator.java | 16 +- .../apache/flink/api/java/operators/Keys.java | 7 +- .../api/java/operators/SortedGrouping.java | 75 ++++++- .../api/java/operators/UnsortedGrouping.java | 17 +- .../flink/api/java/typeutils/PojoTypeInfo.java | 10 +- .../api/java/typeutils/TupleTypeInfoBase.java | 7 +- .../flink/api/java/typeutils/TypeExtractor.java | 56 ++++-- .../flink/api/java/operators/KeysTest.java | 74 ++++++- .../type/extractor/PojoTypeExtractionTest.java | 12 ++ .../java/type/extractor/TypeExtractorTest.java | 1 - .../typeutils/runtime/PojoComparatorTest.java | 6 +- .../EnumTriangleOptITCase.java | 2 +- .../exampleScalaPrograms/PageRankITCase.java | 2 +- .../javaApiOperators/GroupReduceITCase.java | 193 ++++++++++++++++++- .../util/CollectionDataSets.java | 52 +++++ 17 files changed, 466 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java index 1522ed1..51ad548 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java @@ -85,7 +85,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> { && logicalKeyField <= logicalField + (localFieldType.getTotalFields() - 1) ) // check if logical field + lookahead could contain our key ) { // we found a compositeType that is containing the logicalKeyField we are looking for --> create comparator - addCompareField(localFieldId, ((CompositeType<?>) localFieldType).createComparator(new int[] {logicalKeyField}, orders, logicalField)); + addCompareField(localFieldId, ((CompositeType<?>) localFieldType).createComparator(new int[] {logicalKeyField}, new boolean[] {orders[logicalKeyFieldIndex]}, logicalField)); } // maintain logicalField http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java index a79462e..363c7a3 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java @@ -35,7 +35,7 @@ import org.apache.flink.util.Collector; public class PojoExample { /** - * This is the POJO (Plain Old Java Object) that is bein used + * This is the POJO (Plain Old Java Object) that is being used * for all the operations. * As long as all fields are public or have a getter/setter, the system can handle them */ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index cbcc367..1cd85c5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -139,8 +139,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT return po; } - else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { //was field position key - //TODO ask stephan + else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); @@ -167,19 +166,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT return po; } -// else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { -// -// int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); -// UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); -// GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po = -// new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); -// -// po.setCombinable(combinable); -// po.setInput(input); -// po.setDegreeOfParallelism(this.getParallelism()); -// -// return po; -// } else { throw new UnsupportedOperationException("Unrecognized key type."); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index 653a04a..482370e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -160,6 +160,7 @@ public abstract class Keys<T> { public static class ExpressionKeys<T> extends Keys<T> { public static final String SELECT_ALL_CHAR = "*"; + public static final String SELECT_ALL_CHAR_SCALA = "_"; /** * Flattened fields representing keys fields @@ -245,7 +246,9 @@ public abstract class Keys<T> { */ public ExpressionKeys(String[] expressionsIn, TypeInformation<T> type) { if(!(type instanceof CompositeType<?>)) { - throw new IllegalArgumentException("Type "+type+" is not a composite type. Key expressions are not supported."); + throw new IllegalArgumentException("Type "+type+" is not a composite type. " + + "Key expressions are only supported on POJO types and Tuples. " + + "A type is considered a POJO if all its fields are public, or have both getters and setters defined"); } CompositeType<T> cType = (CompositeType<T>) type; @@ -259,7 +262,7 @@ public abstract class Keys<T> { List<FlatFieldDescriptor> keys = new ArrayList<FlatFieldDescriptor>(); // use separate list to do a size check cType.getKey(expressions[i], 0, keys); if(keys.size() == 0) { - throw new IllegalArgumentException("Unable to extract key from expression "+expressions[i]+" on key "+cType); + throw new IllegalArgumentException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType); } keyFields.addAll(keys); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 78fd48b..c9700ce 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -19,15 +19,20 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.functions.FirstReducer; + import java.util.Arrays; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.typeutils.TypeExtractor; +import com.google.common.base.Preconditions; + /** * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/> @@ -43,6 +48,9 @@ public class SortedGrouping<T> extends Grouping<T> { private int[] groupSortKeyPositions; private Order[] groupSortOrders ; + /* + * int sorting keys for tuples + */ public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order order) { super(set, keys); @@ -52,10 +60,27 @@ public class SortedGrouping<T> extends Grouping<T> { if (field >= dataSet.getType().getArity()) { throw new IllegalArgumentException("Order key out of tuple bounds."); } + // use int-based expression key to properly resolve nested tuples for grouping + ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType()); + this.groupSortKeyPositions = ek.computeLogicalKeyPositions(); + this.groupSortOrders = new Order[groupSortKeyPositions.length]; + Arrays.fill(this.groupSortOrders, order); + } + + /* + * String sorting for Pojos and tuples + */ + public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) { + super(set, keys); - this.groupSortKeyPositions = new int[]{field}; - this.groupSortOrders = new Order[]{order}; - + if (!(dataSet.getType() instanceof CompositeType)) { + throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)"); + } + // resolve String-field to int using the expression keys + ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType()); + this.groupSortKeyPositions = ek.computeLogicalKeyPositions(); + this.groupSortOrders = new Order[groupSortKeyPositions.length]; + Arrays.fill(this.groupSortOrders, order); // if field == "*" } protected int[] getGroupSortKeyPositions() { @@ -108,7 +133,7 @@ public class SortedGrouping<T> extends Grouping<T> { /** * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}.</br> - * <b>Note: Only groups of Tuple elements can be sorted.</b><br/> + * <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/> * Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls. * * @param field The Tuple field on which the group is sorted. @@ -120,22 +145,52 @@ public class SortedGrouping<T> extends Grouping<T> { */ public SortedGrouping<T> sortGroup(int field, Order order) { - int pos; - if (!dataSet.getType().isTupleType()) { throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types"); } if (field >= dataSet.getType().getArity()) { throw new IllegalArgumentException("Order key out of tuple bounds."); } + ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType()); + addSortGroupInternal(ek, order); + return this; + } + + private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) { + Preconditions.checkArgument(order != null, "Order can not be null"); + int[] additionalKeyPositions = ek.computeLogicalKeyPositions(); - int newLength = this.groupSortKeyPositions.length + 1; + int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length; this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength); this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength); - pos = newLength - 1; + int pos = newLength - additionalKeyPositions.length; + int off = newLength - additionalKeyPositions.length; + for(;pos < newLength; pos++) { + this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off]; + this.groupSortOrders[pos] = order; // use the same order + } + } + + /** + * Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements within a group on the specified field in the specified {@link Order}.</br> + * <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/> + * Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls. + * + * @param field The Tuple or Pojo field on which the group is sorted. + * @param order The Order in which the specified field is sorted. + * @return A SortedGrouping with specified order of group element. + * + * @see org.apache.flink.api.java.tuple.Tuple + * @see Order + */ + public SortedGrouping<T> sortGroup(String field, Order order) { - this.groupSortKeyPositions[pos] = field; - this.groupSortOrders[pos] = order; + if (! (dataSet.getType() instanceof CompositeType)) { + throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)"); + } + ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType()); + addSortGroupInternal(ek, order); return this; } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 71b1828..910846d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -196,7 +196,7 @@ public class UnsortedGrouping<T> extends Grouping<T> { /** * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}.</br> - * <b>Note: Only groups of Tuple elements can be sorted.</b><br/> + * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/> * Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls. * * @param field The Tuple field on which the group is sorted. @@ -210,4 +210,19 @@ public class UnsortedGrouping<T> extends Grouping<T> { return new SortedGrouping<T>(this.dataSet, this.keys, field, order); } + /** + * Sorts Pojos within a group on the specified field in the specified {@link Order}.</br> + * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/> + * Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls. + * + * @param field The Tuple or Pojo field on which the group is sorted. + * @param order The Order in which the specified field is sorted. + * @return A SortedGrouping with specified order of group element. + * + * @see Order + */ + public SortedGrouping<T> sortGroup(String field, Order order) { + return new SortedGrouping<T>(this.dataSet, this.keys, field, order); + } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index fba1f24..7d6e4ec 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -118,7 +118,7 @@ public class PojoTypeInfo<T> extends CompositeType<T>{ @Override public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { // handle 'select all' first - if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) { + if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { int keyPosition = 0; for(PojoField field : fields) { if(field.type instanceof AtomicType) { @@ -145,6 +145,10 @@ public class PojoTypeInfo<T> extends CompositeType<T>{ fieldId += fields[i].type.getTotalFields()-1; } if (fields[i].field.getName().equals(fieldExpression)) { + if(fields[i].type instanceof CompositeType) { + throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering to a composite type.\n" + + "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field in the sub-type"); + } result.add(new FlatFieldDescriptor(offset + fieldId, fields[i].type)); return; } @@ -158,13 +162,13 @@ public class PojoTypeInfo<T> extends CompositeType<T>{ for (int i = 0; i < fields.length; i++) { if (fields[i].field.getName().equals(firstField)) { if (!(fields[i].type instanceof CompositeType<?>)) { - throw new RuntimeException("Field "+fields[i].type+" is not composite type"); + throw new RuntimeException("Field "+fields[i].type+" (specified by '"+fieldExpression+"') is not a composite type"); } CompositeType<?> cType = (CompositeType<?>) fields[i].type; cType.getKey(rest, offset + fieldId, result); // recurse return; } - fieldId++; + fieldId += fields[i].type.getTotalFields(); } throw new RuntimeException("Unable to find field "+fieldExpression+" in type "+this+" (looking for '"+firstField+"')"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index 4babbd7..dc75b2c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -95,7 +95,7 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { @Override public void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { // handle 'select all' - if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) { + if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { int keyPosition = 0; for(TypeInformation<?> type : types) { if(type instanceof AtomicType) { @@ -157,7 +157,10 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { for(int i = 0; i < pos; i++) { offset += types[i].getTotalFields() - 1; // this adds only something to offset if its a composite type. } - + if(types[pos] instanceof CompositeType) { + throw new IllegalArgumentException("The specified field '"+fieldExpression+"' is refering to a composite type.\n" + + "Either select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field in the sub-type"); + } result.add(new FlatFieldDescriptor(offset + pos, types[pos])); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 5d216e9..5935e31 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -337,7 +337,7 @@ public class TypeExtractor { int fieldCount = countFieldsInClass(tAsClass); if(fieldCount != tupleSubTypes.length) { // the class is not a real tuple because it contains additional fields. treat as a pojo - return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>() ); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class. + return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class. } return new TupleTypeInfo(tAsClass, tupleSubTypes); @@ -399,7 +399,7 @@ public class TypeExtractor { } // objects with generics are treated as raw type else if (t instanceof ParameterizedType) { //TODO - return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), typeHierarchy); + return privateGetForClass((Class<OUT>) ((ParameterizedType) t).getRawType(), typeHierarchy, (ParameterizedType) t); } // no tuple, no TypeVariable, no generic type else if (t instanceof Class) { @@ -859,8 +859,11 @@ public class TypeExtractor { return new TypeExtractor().privateGetForClass(clazz, new ArrayList<Type>()); } - @SuppressWarnings("unchecked") private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) { + return privateGetForClass(clazz, typeHierarchy, null); + } + @SuppressWarnings("unchecked") + private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) { Validate.notNull(clazz); // check for abstract classes or interfaces @@ -931,7 +934,7 @@ public class TypeExtractor { // this is a type generated by Avro. GenericTypeInfo is able to handle this case because its using Avro. return new GenericTypeInfo<X>(clazz); } - TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy); + TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint); if (pojoType != null) { return pojoType; } @@ -968,7 +971,7 @@ public class TypeExtractor { // check for getter if( // The name should be "get<FieldName>" or "<fieldName>" (for scala). - (m.getName().toLowerCase().contains("get"+fieldNameLow) || m.getName().toLowerCase().contains(fieldNameLow)) && + (m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) && // no arguments for the getter m.getParameterTypes().length == 0 && // return type is same as field type (or the generic variant of it) @@ -980,7 +983,7 @@ public class TypeExtractor { hasGetter = true; } // check for setters (<FieldName>_$eq for scala) - if((m.getName().toLowerCase().contains("set"+fieldNameLow) || m.getName().toLowerCase().contains(fieldNameLow+"_$eq")) && + if((m.getName().toLowerCase().equals("set"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow+"_$eq")) && m.getParameterTypes().length == 1 && // one parameter of the field's type ( m.getParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&& // return type is void. @@ -1006,13 +1009,16 @@ public class TypeExtractor { } } - private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy) { + private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) { // try to create Type hierarchy, if the incoming one is empty. if(typeHierarchy.size() == 0) { recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class); } + if(clazzTypeHint != null) { + recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class); + } - List<Field> fields = removeNonObjectFields(getAllDeclaredFields(clazz)); + List<Field> fields = getAllDeclaredFields(clazz); List<PojoField> pojoFields = new ArrayList<PojoField>(); for (Field field : fields) { Type fieldType = field.getGenericType(); @@ -1057,32 +1063,40 @@ public class TypeExtractor { return pojoType; } - // recursively determine all declared fields - private static List<Field> getAllDeclaredFields(Class<?> clazz) { + /** + * recursively determine all declared fields + * This is required because getFields() is not returning + */ + public static List<Field> getAllDeclaredFields(Class<?> clazz) { List<Field> result = new ArrayList<Field>(); while (clazz != null) { Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { + if(Modifier.isTransient(field.getModifiers()) || Modifier.isStatic(field.getModifiers())) { + continue; // we have no use for transient or static fields + } + if(hasFieldWithSameName(field.getName(), result)) { + throw new RuntimeException("The field "+field+" is already contained in the hierarchy of the class "+clazz+"." + + "Please use unique field names through your classes hierarchy"); + } result.add(field); } clazz = clazz.getSuperclass(); } return result; } - - /** - * Remove transient and static fields from a list of fields. - */ - private static List<Field> removeNonObjectFields(List<Field> fields) { - List<Field> result = new ArrayList<Field>(); - for(Field field: fields) { - if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) { - result.add(field); + + private static boolean hasFieldWithSameName(String name, List<Field> fields) { + for(Field field : fields) { + if(name.equals(field.getName())) { + return true; } } - return result; + return false; } + + // recursively determine all declared methods private static List<Method> getAllDeclaredMethods(Class<?> clazz) { List<Method> result = new ArrayList<Method>(); @@ -1111,7 +1125,7 @@ public class TypeExtractor { int numFields = t.getArity(); if(numFields != countFieldsInClass(value.getClass())) { // not a tuple since it has more fields. - return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>()); // we immediately call analyze Pojo here, because + return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null); // we immediately call analyze Pojo here, because // there is currently no other type that can handle such a class. } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java index 00ab520..37a117f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.api.java.operators; import java.lang.reflect.InvocationTargetException; @@ -24,10 +23,13 @@ import java.util.Arrays; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -93,7 +95,7 @@ public class KeysTest { } @Test - public void testInvalid() throws Throwable { + public void testInvalidTuple() throws Throwable { TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, String>> typeInfo = new TupleTypeInfo<Tuple3<String,Tuple3<String,String,String>,String>>( BasicTypeInfo.STRING_TYPE_INFO, new TupleTypeInfo<Tuple3<String, String, String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO), @@ -101,7 +103,12 @@ public class KeysTest { ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>> fpk; String[][] tests = new String[][] { - new String[] {"f11"},new String[] {"f-35"}, new String[] {"f0.f33"}, new String[] {"f1.f33"} + new String[] {"f0.f1"}, // nesting into unnested + new String[] {"f11"}, + new String[] {"f-35"}, + new String[] {"f0.f33"}, + new String[] {"f1.f33"}, + new String[] {"f1"} // select full tuple without saying "f1.*" }; for(int i = 0; i < tests.length; i++) { Throwable e = null; @@ -115,6 +122,28 @@ public class KeysTest { } } + @Test + public void testInvalidPojo() throws Throwable { + TypeInformation<ComplexNestedClass> ti = TypeExtractor.getForClass(ComplexNestedClass.class); + ExpressionKeys<ComplexNestedClass> ek; + + String[][] tests = new String[][] { + new String[] {"nonexistent"}, + new String[] {"date.abc"}, // nesting into unnested + new String[] {"word"} // select full tuple without saying "f1.*" + }; + for(int i = 0; i < tests.length; i++) { + Throwable e = null; + try { + ek = new ExpressionKeys<ComplexNestedClass>(tests[i], ti); + } catch(Throwable t) { + // System.err.println("Message: "+t.getMessage()); t.printStackTrace(); + e = t; + } + Assert.assertNotNull(e); + } + } + @Test public void testTupleKeyExpansion() { TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, String>> typeInfo = new TupleTypeInfo<Tuple3<String,Tuple3<String,String,String>,String>>( @@ -144,6 +173,10 @@ public class KeysTest { fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"*"}, typeInfo); Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions()); + // scala style "select all" + fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"_"}, typeInfo); + Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions()); + // this was a bug: fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"f2"}, typeInfo); Assert.assertArrayEquals(new int[] {4}, fpk.computeLogicalKeyPositions()); @@ -177,10 +210,45 @@ public class KeysTest { complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"*"}, complexTypeInfo); Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions()); + // scala style select all + complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"_"}, complexTypeInfo); + Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions()); + complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f1.f0.*"}, complexTypeInfo); Assert.assertArrayEquals(new int[] {1,2,3}, complexFpk.computeLogicalKeyPositions()); complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f2"}, complexTypeInfo); Assert.assertArrayEquals(new int[] {6}, complexFpk.computeLogicalKeyPositions()); } + + public static class Pojo1 { + public String a; + public String b; + } + public static class Pojo2 { + public String a2; + public String b2; + } + public static class PojoWithMultiplePojos { + public Pojo1 p1; + public Pojo2 p2; + public Integer i0; + } + + @Test + public void testPojoKeys() { + TypeInformation<PojoWithMultiplePojos> ti = TypeExtractor.getForClass(PojoWithMultiplePojos.class); + ExpressionKeys<PojoWithMultiplePojos> ek; + ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"*"}, ti); + Assert.assertArrayEquals(new int[] {0,1,2,3,4}, ek.computeLogicalKeyPositions()); + + ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"p1.*"}, ti); + Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions()); + + ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"p2.*"}, ti); + Assert.assertArrayEquals(new int[] {3,4}, ek.computeLogicalKeyPositions()); + + ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"i0"}, ti); + Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions()); + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java index 34dbd99..01d32c1 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -50,6 +50,14 @@ import com.google.common.collect.HashMultiset; */ public class PojoTypeExtractionTest { + public static class HasDuplicateField extends WC { + private int count; // duplicate + } + + @Test(expected=RuntimeException.class) + public void testDuplicateFieldException() { + TypeExtractor.createTypeInfo(HasDuplicateField.class); + } // test with correct pojo types public static class WC { // is a pojo @@ -210,6 +218,10 @@ public class PojoTypeExtractionTest { } ffd.clear(); + // scala style full tuple selection for pojos + pojoType.getKey("complex.word._", 0, ffd); + Assert.assertEquals(3, ffd.size()); + ffd.clear(); pojoType.getKey("complex.*", 0, ffd); Assert.assertEquals(8, ffd.size()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index b4b1c19..695a42e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -1247,7 +1247,6 @@ public class TypeExtractorTest { public static class InType extends MyObject<String> {} @SuppressWarnings({ "rawtypes", "unchecked" }) @Test -// @Ignore public void testParamertizedCustomObject() { RichMapFunction<?, ?> function = new RichMapFunction<InType, MyObject<String>>() { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java index 61f6167..e53f48a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java @@ -28,10 +28,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.junit.Assert; -import org.junit.Ignore; -//@Ignore // TODO public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple> { TypeInformation<PojoContainingTuple> type = TypeExtractor.getForClass(PojoContainingTuple.class); @@ -39,7 +37,7 @@ public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple> new PojoContainingTuple(1, 1L, 1L), new PojoContainingTuple(2, 2L, 2L), new PojoContainingTuple(8519, 85190L, 85190L), - new PojoContainingTuple(-51498, 85191L, 85191L), + new PojoContainingTuple(8520, 85191L, 85191L), }; @Override @@ -48,7 +46,7 @@ public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple> CompositeType<PojoContainingTuple> cType = (CompositeType<PojoContainingTuple>) type; ExpressionKeys<PojoContainingTuple> keys = new ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType); boolean[] orders = new boolean[keys.getNumberOfKeyFields()]; - Arrays.fill(orders, true); + Arrays.fill(orders, ascending); return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java index 4f00b1d..9701086 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.test.exampleScalaPrograms; -import org.apache.flink.examples.java.graph.EnumTrianglesOpt; +import org.apache.flink.examples.scala.graph.EnumTrianglesOpt; import org.apache.flink.test.testdata.EnumTriangleData; import org.apache.flink.test.util.JavaProgramTestBase; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java index 42c6a8e..bc026c9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java @@ -24,7 +24,7 @@ import java.util.Collection; import java.util.LinkedList; import org.apache.flink.configuration.Configuration; -import org.apache.flink.examples.java.graph.PageRankBasic; +import org.apache.flink.examples.scala.graph.PageRankBasic; import org.apache.flink.test.testdata.PageRankData; import org.apache.flink.test.util.JavaProgramTestBase; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index fce56d1..63c4dfc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -53,7 +53,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; @RunWith(Parameterized.class) public class GroupReduceITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 19; + private static int NUM_PROGRAMS = 26; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -598,7 +598,175 @@ public class GroupReduceITCase extends JavaProgramTestBase { // return expected result return "3\n1\n"; - } + } + case 20: { + /* + * Test string-based definition on group sort, based on test: + * check correctness of groupReduce with descending group sort + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + groupBy(1).sortGroup("f2", Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1,Hi\n" + + "5,2,Hello world-Hello\n" + + "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" + + "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" + + "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" + + "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n"; + + } + case 21: { + /* + * Test int-based definition on group sort, for (full) nested Tuple + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env); + DataSet<String> reduceDs = ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new NestedTupleReducer()); + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "a--(1,1)-(1,2)-(1,3)-\n" + + "b--(2,2)-\n"+ + "c--(3,3)-(3,6)-(3,9)-\n"; + } + case 22: { + /* + * Test int-based definition on group sort, for (partial) nested Tuple ASC + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env); + // f0.f0 is first integer + DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.ASCENDING).reduceGroup(new NestedTupleReducer()); + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "a--(1,3)-(1,2)-(2,1)-\n" + + "b--(2,2)-\n"+ + "c--(3,3)-(3,6)-(4,9)-\n"; + } + case 23: { + /* + * Test string-based definition on group sort, for (partial) nested Tuple DESC + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env); + // f0.f0 is first integer + DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).reduceGroup(new NestedTupleReducer()); + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "a--(2,1)-(1,3)-(1,2)-\n" + + "b--(2,2)-\n"+ + "c--(4,9)-(3,3)-(3,6)-\n"; + } + case 24: { + /* + * Test string-based definition on group sort, for two grouping keys + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env); + // f0.f0 is first integer + DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).sortGroup("f0.f1", Order.DESCENDING).reduceGroup(new NestedTupleReducer()); + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "a--(2,1)-(1,3)-(1,2)-\n" + + "b--(2,2)-\n"+ + "c--(4,9)-(3,6)-(3,3)-\n"; + } + case 25: { + /* + * Test string-based definition on group sort, for two grouping keys with Pojos + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env); + // f0.f0 is first integer + DataSet<String> reduceDs = ds.groupBy("hadoopFan").sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING) + .reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, String>() { + @Override + public void reduce( + Iterable<PojoContainingTupleAndWritable> values, + Collector<String> out) throws Exception { + boolean once = false; + StringBuilder concat = new StringBuilder(); + for(PojoContainingTupleAndWritable value : values) { + if(!once) { + concat.append(value.hadoopFan.get()); + concat.append("---"); + once = true; + } + concat.append(value.theTuple); + concat.append("-"); + } + out.collect(concat.toString()); + } + }); + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "1---(10,100)-\n" + + "2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n"; + } + case 26: { + /* + * Test grouping with pojo containing multiple pojos (was a bug) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getPojoWithMultiplePojos(env); + // f0.f0 is first integer + DataSet<String> reduceDs = ds.groupBy("hadoopFan").sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING) + .reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, String>() { + @Override + public void reduce( + Iterable<PojoContainingTupleAndWritable> values, + Collector<String> out) throws Exception { + boolean once = false; + StringBuilder concat = new StringBuilder(); + for(PojoContainingTupleAndWritable value : values) { + if(!once) { + concat.append(value.hadoopFan.get()); + concat.append("---"); + once = true; + } + concat.append(value.theTuple); + concat.append("-"); + } + out.collect(concat.toString()); + } + }); + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "1---(10,100)-\n" + + "2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n"; + } + default: { throw new IllegalArgumentException("Invalid program id"); } @@ -607,6 +775,27 @@ public class GroupReduceITCase extends JavaProgramTestBase { } + + public static class NestedTupleReducer implements GroupReduceFunction<Tuple2<Tuple2<Integer,Integer>,String>, String> { + @Override + public void reduce( + Iterable<Tuple2<Tuple2<Integer, Integer>, String>> values, + Collector<String> out) + throws Exception { + boolean once = false; + StringBuilder concat = new StringBuilder(); + for(Tuple2<Tuple2<Integer, Integer>, String> value : values) { + if(!once) { + concat.append(value.f1).append("--"); + once = true; + } + concat.append(value.f0); // the tuple with the sorted groups + concat.append("-"); + } + out.collect(concat.toString()); + } + } + public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index d023287..e5ab29a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -155,6 +155,26 @@ public class CollectionDataSets { return env.fromCollection(data, type); } + public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) { + + List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<Tuple2<Tuple2<Integer, Integer>, String>>(); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(1,3), "a")); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(1,2), "a")); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(2,1), "a")); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(2,2), "b")); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(3,3), "c")); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(3,6), "c")); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(4,9), "c")); + + TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new + TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>>( + new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO), + BasicTypeInfo.STRING_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + public static DataSet<String> getStringDataSet(ExecutionEnvironment env) { List<String> data = new ArrayList<String>(); @@ -418,5 +438,37 @@ public class CollectionDataSets { data.add(new Tuple3<Integer,CrazyNested, POJO>(2, new CrazyNested("two", "duo", 2L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) )); // 1x return env.fromCollection(data); } + + public static class Pojo1 { + public String a; + public String b; + } + public static class Pojo2 { + public String a2; + public String b2; + } + public static class PojoWithMultiplePojos { + public Pojo1 p1; + public Pojo2 p2; + public Integer i0; + public PojoWithMultiplePojos() {} + public PojoWithMultiplePojos(String a, String b, String a1, String b1, Integer i0) { + p1 = new Pojo1(); + p1.a = a; + p1.b = b; + p2 = new Pojo2(); + p2.a2 = a1; + p2.a2 = b1; + this.i0 = i0; + } + } + + public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) { + List<PojoWithMultiplePojos> data = new ArrayList<PojoWithMultiplePojos>(); + data.add(new PojoWithMultiplePojos("a","aa","b","bb", 1)); + data.add(new PojoWithMultiplePojos("b","bb","c","cc", 2)); + data.add(new PojoWithMultiplePojos("d","dd","e","ee", 3)); + return env.fromCollection(data); + } }
