Various fixes
- improved error messages
- the composite vs atomic bug aljoscha found
- Comparator test for Pojo Comparator enabled
- TODO removed
- string-based key expression for group sorting fields definition
- support for specifying "select all" using * and now also _ (for scala fans)
- Exception if user is having multiple fields with the same name in the class


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fd0be2ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fd0be2ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fd0be2ff

Branch: refs/heads/master
Commit: fd0be2ff13494ee3739f77b1a4368b5644c55cb8
Parents: 6b493fb
Author: Robert Metzger <[email protected]>
Authored: Mon Oct 6 17:11:37 2014 +0200
Committer: Robert Metzger <[email protected]>
Committed: Wed Oct 8 11:39:01 2014 +0200

----------------------------------------------------------------------
 .../api/common/typeutils/CompositeType.java     |   2 +-
 .../examples/java/wordcount/PojoExample.java    |   2 +-
 .../api/java/operators/GroupReduceOperator.java |  16 +-
 .../apache/flink/api/java/operators/Keys.java   |   7 +-
 .../api/java/operators/SortedGrouping.java      |  75 ++++++-
 .../api/java/operators/UnsortedGrouping.java    |  17 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  10 +-
 .../api/java/typeutils/TupleTypeInfoBase.java   |   7 +-
 .../flink/api/java/typeutils/TypeExtractor.java |  56 ++++--
 .../flink/api/java/operators/KeysTest.java      |  74 ++++++-
 .../type/extractor/PojoTypeExtractionTest.java  |  12 ++
 .../java/type/extractor/TypeExtractorTest.java  |   1 -
 .../typeutils/runtime/PojoComparatorTest.java   |   6 +-
 .../EnumTriangleOptITCase.java                  |   2 +-
 .../exampleScalaPrograms/PageRankITCase.java    |   2 +-
 .../javaApiOperators/GroupReduceITCase.java     | 193 ++++++++++++++++++-
 .../util/CollectionDataSets.java                |  52 +++++
 17 files changed, 466 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 1522ed1..51ad548 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -85,7 +85,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
                                                && logicalKeyField <= 
logicalField + (localFieldType.getTotalFields() - 1) ) // check if logical 
field + lookahead could contain our key
                                                ) {
                                        // we found a compositeType that is 
containing the logicalKeyField we are looking for --> create comparator
-                                       addCompareField(localFieldId, 
((CompositeType<?>) localFieldType).createComparator(new int[] 
{logicalKeyField}, orders, logicalField));
+                                       addCompareField(localFieldId, 
((CompositeType<?>) localFieldType).createComparator(new int[] 
{logicalKeyField}, new boolean[] {orders[logicalKeyFieldIndex]}, logicalField));
                                }
                                
                                // maintain logicalField

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
index a79462e..363c7a3 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
@@ -35,7 +35,7 @@ import org.apache.flink.util.Collector;
 public class PojoExample {
        
        /**
-        * This is the POJO (Plain Old Java Object) that is bein used
+        * This is the POJO (Plain Old Java Object) that is being used
         * for all the operations.
         * As long as all fields are public or have a getter/setter, the system 
can handle them
         */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index cbcc367..1cd85c5 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -139,8 +139,7 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                        
                        return po;
                }
-               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { 
//was field position key
-                       //TODO ask stephan
+               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
 
                        int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
                        UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
@@ -167,19 +166,6 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                        
                        return po;
                }
-//             else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
-//
-//                     int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
-//                     UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-//                     GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>> po =
-//                                     new GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, 
name);
-//
-//                     po.setCombinable(combinable);
-//                     po.setInput(input);
-//                     po.setDegreeOfParallelism(this.getParallelism());
-//
-//                     return po;
-//             }
                else {
                        throw new UnsupportedOperationException("Unrecognized 
key type.");
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 653a04a..482370e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -160,6 +160,7 @@ public abstract class Keys<T> {
        public static class ExpressionKeys<T> extends Keys<T> {
                
                public static final String SELECT_ALL_CHAR = "*";
+               public static final String SELECT_ALL_CHAR_SCALA = "_";
                
                /**
                 * Flattened fields representing keys fields
@@ -245,7 +246,9 @@ public abstract class Keys<T> {
                 */
                public ExpressionKeys(String[] expressionsIn, 
TypeInformation<T> type) {
                        if(!(type instanceof CompositeType<?>)) {
-                               throw new IllegalArgumentException("Type 
"+type+" is not a composite type. Key expressions are not supported.");
+                               throw new IllegalArgumentException("Type 
"+type+" is not a composite type. "
+                                               + "Key expressions are only 
supported on POJO types and Tuples. "
+                                               + "A type is considered a POJO 
if all its fields are public, or have both getters and setters defined");
                        }
                        CompositeType<T> cType = (CompositeType<T>) type;
                        
@@ -259,7 +262,7 @@ public abstract class Keys<T> {
                                List<FlatFieldDescriptor> keys = new 
ArrayList<FlatFieldDescriptor>(); // use separate list to do a size check
                                cType.getKey(expressions[i], 0, keys);
                                if(keys.size() == 0) {
-                                       throw new 
IllegalArgumentException("Unable to extract key from expression 
"+expressions[i]+" on key "+cType);
+                                       throw new 
IllegalArgumentException("Unable to extract key from expression 
'"+expressions[i]+"' on key "+cType);
                                }
                                keyFields.addAll(keys);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 78fd48b..c9700ce 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -19,15 +19,20 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.FirstReducer;
+
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
+import com.google.common.base.Preconditions;
+
 
 /**
  * SortedGrouping is an intermediate step for a transformation on a grouped 
and sorted DataSet.<br/>
@@ -43,6 +48,9 @@ public class SortedGrouping<T> extends Grouping<T> {
        private int[] groupSortKeyPositions;
        private Order[] groupSortOrders ;
        
+       /*
+        * int sorting keys for tuples
+        */
        public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order 
order) {
                super(set, keys);
                
@@ -52,10 +60,27 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (field >= dataSet.getType().getArity()) {
                        throw new IllegalArgumentException("Order key out of 
tuple bounds.");
                }
+               // use int-based expression key to properly resolve nested 
tuples for grouping
+               ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, 
dataSet.getType());
+               this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
+               this.groupSortOrders = new Order[groupSortKeyPositions.length];
+               Arrays.fill(this.groupSortOrders, order);
+       }
+       
+       /*
+        * String sorting for Pojos and tuples
+        */
+       public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order 
order) {
+               super(set, keys);
                
-               this.groupSortKeyPositions = new int[]{field};
-               this.groupSortOrders = new Order[]{order};
-               
+               if (!(dataSet.getType() instanceof CompositeType)) {
+                       throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for composite data types (pojo / tuple / 
case class)");
+               }
+               // resolve String-field to int using the expression keys
+               ExpressionKeys<T> ek = new ExpressionKeys<T>(new 
String[]{field}, dataSet.getType());
+               this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
+               this.groupSortOrders = new Order[groupSortKeyPositions.length];
+               Arrays.fill(this.groupSortOrders, order); // if field == "*"
        }
        
        protected int[] getGroupSortKeyPositions() {
@@ -108,7 +133,7 @@ public class SortedGrouping<T> extends Grouping<T> {
        
        /**
         * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within 
a group on the specified field in the specified {@link Order}.</br>
-        * <b>Note: Only groups of Tuple elements can be sorted.</b><br/>
+        * <b>Note: Only groups of Tuple or Pojo elements can be 
sorted.</b><br/>
         * Groups can be sorted by multiple fields by chaining {@link 
#sortGroup(int, Order)} calls.
         * 
         * @param field The Tuple field on which the group is sorted.
@@ -120,22 +145,52 @@ public class SortedGrouping<T> extends Grouping<T> {
         */
        public SortedGrouping<T> sortGroup(int field, Order order) {
                
-               int pos;
-               
                if (!dataSet.getType().isTupleType()) {
                        throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for tuple data types");
                }
                if (field >= dataSet.getType().getArity()) {
                        throw new IllegalArgumentException("Order key out of 
tuple bounds.");
                }
+               ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, 
dataSet.getType());
+               addSortGroupInternal(ek, order);
+               return this;
+       }
+       
+       private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
+               Preconditions.checkArgument(order != null, "Order can not be 
null");
+               int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
                
-               int newLength = this.groupSortKeyPositions.length + 1;
+               int newLength = this.groupSortKeyPositions.length + 
additionalKeyPositions.length;
                this.groupSortKeyPositions = 
Arrays.copyOf(this.groupSortKeyPositions, newLength);
                this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, 
newLength);
-               pos = newLength - 1;
+               int pos = newLength - additionalKeyPositions.length;
+               int off = newLength - additionalKeyPositions.length;
+               for(;pos < newLength; pos++) {
+                       this.groupSortKeyPositions[pos] = 
additionalKeyPositions[pos - off];
+                       this.groupSortOrders[pos] = order; // use the same order
+               }
+       }
+       
+       /**
+        * Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements 
within a group on the specified field in the specified {@link Order}.</br>
+        * <b>Note: Only groups of Tuple or Pojo elements can be 
sorted.</b><br/>
+        * Groups can be sorted by multiple fields by chaining {@link 
#sortGroup(String, Order)} calls.
+        * 
+        * @param field The Tuple or Pojo field on which the group is sorted.
+        * @param order The Order in which the specified field is sorted.
+        * @return A SortedGrouping with specified order of group element.
+        * 
+        * @see org.apache.flink.api.java.tuple.Tuple
+        * @see Order
+        */
+       public SortedGrouping<T> sortGroup(String field, Order order) {
                
-               this.groupSortKeyPositions[pos] = field;
-               this.groupSortOrders[pos] = order;
+               if (! (dataSet.getType() instanceof CompositeType)) {
+                       throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for composite data types (pojo / tuple / 
case class)");
+               }
+               ExpressionKeys<T> ek = new ExpressionKeys<T>(new 
String[]{field}, dataSet.getType());
+               addSortGroupInternal(ek, order);
                return this;
        }
+       
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 71b1828..910846d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -196,7 +196,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
        
        /**
         * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within 
a group on the specified field in the specified {@link Order}.</br>
-        * <b>Note: Only groups of Tuple elements can be sorted.</b><br/>
+        * <b>Note: Only groups of Tuple elements and Pojos can be 
sorted.</b><br/>
         * Groups can be sorted by multiple fields by chaining {@link 
#sortGroup(int, Order)} calls.
         * 
         * @param field The Tuple field on which the group is sorted.
@@ -210,4 +210,19 @@ public class UnsortedGrouping<T> extends Grouping<T> {
                return new SortedGrouping<T>(this.dataSet, this.keys, field, 
order);
        }
        
+       /**
+        * Sorts Pojos within a group on the specified field in the specified 
{@link Order}.</br>
+        * <b>Note: Only groups of Tuple elements and Pojos can be 
sorted.</b><br/>
+        * Groups can be sorted by multiple fields by chaining {@link 
#sortGroup(String, Order)} calls.
+        * 
+        * @param field The Tuple or Pojo field on which the group is sorted.
+        * @param order The Order in which the specified field is sorted.
+        * @return A SortedGrouping with specified order of group element.
+        * 
+        * @see Order
+        */
+       public SortedGrouping<T> sortGroup(String field, Order order) {
+               return new SortedGrouping<T>(this.dataSet, this.keys, field, 
order);
+       }
+       
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index fba1f24..7d6e4ec 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -118,7 +118,7 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
        @Override
        public void getKey(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
                // handle 'select all' first
-               if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) {
+               if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
                        int keyPosition = 0;
                        for(PojoField field : fields) {
                                if(field.type instanceof AtomicType) {
@@ -145,6 +145,10 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
                                        fieldId += 
fields[i].type.getTotalFields()-1;
                                }
                                if 
(fields[i].field.getName().equals(fieldExpression)) {
+                                       if(fields[i].type instanceof 
CompositeType) {
+                                               throw new 
IllegalArgumentException("The specified field '"+fieldExpression+"' is refering 
to a composite type.\n"
+                                                               + "Either 
select all elements in this type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' 
operator or specify a field in the sub-type");
+                                       }
                                        result.add(new 
FlatFieldDescriptor(offset + fieldId, fields[i].type));
                                        return;
                                }
@@ -158,13 +162,13 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
                        for (int i = 0; i < fields.length; i++) {
                                if 
(fields[i].field.getName().equals(firstField)) {
                                        if (!(fields[i].type instanceof 
CompositeType<?>)) {
-                                               throw new 
RuntimeException("Field "+fields[i].type+" is not composite type");
+                                               throw new 
RuntimeException("Field "+fields[i].type+" (specified by '"+fieldExpression+"') 
is not a composite type");
                                        }
                                        CompositeType<?> cType = 
(CompositeType<?>) fields[i].type;
                                        cType.getKey(rest, offset + fieldId, 
result); // recurse
                                        return;
                                }
-                               fieldId++;
+                               fieldId += fields[i].type.getTotalFields();
                        }
                        throw new RuntimeException("Unable to find field 
"+fieldExpression+" in type "+this+" (looking for '"+firstField+"')");
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 4babbd7..dc75b2c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -95,7 +95,7 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
        @Override
        public void getKey(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
                // handle 'select all'
-               if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR)) {
+               if(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
                        int keyPosition = 0;
                        for(TypeInformation<?> type : types) {
                                if(type instanceof AtomicType) {
@@ -157,7 +157,10 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
                for(int i = 0; i < pos; i++) {
                        offset += types[i].getTotalFields() - 1; // this adds 
only something to offset if its a composite type.
                }
-               
+               if(types[pos] instanceof CompositeType) {
+                       throw new IllegalArgumentException("The specified field 
'"+fieldExpression+"' is refering to a composite type.\n"
+                                       + "Either select all elements in this 
type with the '"+ExpressionKeys.SELECT_ALL_CHAR+"' operator or specify a field 
in the sub-type");
+               }
                result.add(new FlatFieldDescriptor(offset + pos, types[pos]));
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 5d216e9..5935e31 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -337,7 +337,7 @@ public class TypeExtractor {
                        int fieldCount = countFieldsInClass(tAsClass);
                        if(fieldCount != tupleSubTypes.length) {
                                // the class is not a real tuple because it 
contains additional fields. treat as a pojo
-                               return (TypeInformation<OUT>) 
analyzePojo(tAsClass, new ArrayList<Type>() ); // the typeHierarchy here should 
be sufficient, even though it stops at the Tuple.class.
+                               return (TypeInformation<OUT>) 
analyzePojo(tAsClass, new ArrayList<Type>(), null); // the typeHierarchy here 
should be sufficient, even though it stops at the Tuple.class.
                        }
                        
                        return new TupleTypeInfo(tAsClass, tupleSubTypes);
@@ -399,7 +399,7 @@ public class TypeExtractor {
                }
                // objects with generics are treated as raw type
                else if (t instanceof ParameterizedType) { //TODO
-                       return privateGetForClass((Class<OUT>) 
((ParameterizedType) t).getRawType(), typeHierarchy);
+                       return privateGetForClass((Class<OUT>) 
((ParameterizedType) t).getRawType(), typeHierarchy, (ParameterizedType) t);
                }
                // no tuple, no TypeVariable, no generic type
                else if (t instanceof Class) {
@@ -859,8 +859,11 @@ public class TypeExtractor {
                return new TypeExtractor().privateGetForClass(clazz, new 
ArrayList<Type>());
        }
        
-       @SuppressWarnings("unchecked")
        private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, 
ArrayList<Type> typeHierarchy) {
+               return privateGetForClass(clazz, typeHierarchy, null);
+       }
+       @SuppressWarnings("unchecked")
+       private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, 
ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
                Validate.notNull(clazz);
                
                // check for abstract classes or interfaces
@@ -931,7 +934,7 @@ public class TypeExtractor {
                        // this is a type generated by Avro. GenericTypeInfo is 
able to handle this case because its using Avro.
                        return new GenericTypeInfo<X>(clazz);
                }
-               TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy);
+               TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy, 
clazzTypeHint);
                if (pojoType != null) {
                        return pojoType;
                }
@@ -968,7 +971,7 @@ public class TypeExtractor {
                                // check for getter
                                
                                if(     // The name should be "get<FieldName>" 
or "<fieldName>" (for scala).
-                    (m.getName().toLowerCase().contains("get"+fieldNameLow) || 
m.getName().toLowerCase().contains(fieldNameLow)) &&
+                                       
(m.getName().toLowerCase().equals("get"+fieldNameLow) || 
m.getName().toLowerCase().equals(fieldNameLow)) &&
                                        // no arguments for the getter
                                        m.getParameterTypes().length == 0 &&
                                        // return type is same as field type 
(or the generic variant of it)
@@ -980,7 +983,7 @@ public class TypeExtractor {
                                        hasGetter = true;
                                }
                                // check for setters (<FieldName>_$eq for scala)
-                               
if((m.getName().toLowerCase().contains("set"+fieldNameLow) || 
m.getName().toLowerCase().contains(fieldNameLow+"_$eq")) &&
+                               
if((m.getName().toLowerCase().equals("set"+fieldNameLow) || 
m.getName().toLowerCase().equals(fieldNameLow+"_$eq")) &&
                                        m.getParameterTypes().length == 1 && // 
one parameter of the field's type
                                        ( m.getParameterTypes()[0].equals( 
fieldType ) || (fieldTypeGeneric != null && 
m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
                                        // return type is void.
@@ -1006,13 +1009,16 @@ public class TypeExtractor {
                }
        }
 
-       private <X> TypeInformation<X> analyzePojo(Class<X> clazz, 
ArrayList<Type> typeHierarchy) {
+       private <X> TypeInformation<X> analyzePojo(Class<X> clazz, 
ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
                // try to create Type hierarchy, if the incoming one is empty.
                if(typeHierarchy.size() == 0) {
                        recursivelyGetTypeHierarchy(typeHierarchy, clazz, 
Object.class);
                }
+               if(clazzTypeHint != null) {
+                       recursivelyGetTypeHierarchy(typeHierarchy, 
clazzTypeHint, Object.class);
+               }
                
-               List<Field> fields = 
removeNonObjectFields(getAllDeclaredFields(clazz));
+               List<Field> fields = getAllDeclaredFields(clazz);
                List<PojoField> pojoFields = new ArrayList<PojoField>();
                for (Field field : fields) {
                        Type fieldType = field.getGenericType();
@@ -1057,32 +1063,40 @@ public class TypeExtractor {
                return pojoType;
        }
 
-       // recursively determine all declared fields
-       private static List<Field> getAllDeclaredFields(Class<?> clazz) {
+       /**
+        * recursively determine all declared fields
+        * This is required because getFields() is not returning 
+        */
+       public static List<Field> getAllDeclaredFields(Class<?> clazz) {
                List<Field> result = new ArrayList<Field>();
                while (clazz != null) {
                        Field[] fields = clazz.getDeclaredFields();
                        for (Field field : fields) {
+                               if(Modifier.isTransient(field.getModifiers()) 
|| Modifier.isStatic(field.getModifiers())) {
+                                       continue; // we have no use for 
transient or static fields
+                               }
+                               if(hasFieldWithSameName(field.getName(), 
result)) {
+                                       throw new RuntimeException("The field 
"+field+" is already contained in the hierarchy of the class "+clazz+"."
+                                                       + "Please use unique 
field names through your classes hierarchy");
+                               }
                                result.add(field);
                        }
                        clazz = clazz.getSuperclass();
                }
                return result;
        }
-
-       /**
-        * Remove transient and static fields from a list of fields.
-        */
-       private static List<Field> removeNonObjectFields(List<Field> fields) {
-               List<Field> result = new ArrayList<Field>();
-               for(Field field: fields) {
-                       if (!Modifier.isTransient(field.getModifiers()) && 
!Modifier.isStatic(field.getModifiers())) {
-                               result.add(field);
+       
+       private static boolean hasFieldWithSameName(String name, List<Field> 
fields) {
+               for(Field field : fields) {
+                       if(name.equals(field.getName())) {
+                               return true;
                        }
                }
-               return result;
+               return false;
        }
        
+
+       
        // recursively determine all declared methods
        private static List<Method> getAllDeclaredMethods(Class<?> clazz) {
                List<Method> result = new ArrayList<Method>();
@@ -1111,7 +1125,7 @@ public class TypeExtractor {
                        int numFields = t.getArity();
                        if(numFields != countFieldsInClass(value.getClass())) {
                                // not a tuple since it has more fields. 
-                               return analyzePojo((Class<X>) value.getClass(), 
new ArrayList<Type>()); // we immediately call analyze Pojo here, because
+                               return analyzePojo((Class<X>) value.getClass(), 
new ArrayList<Type>(), null); // we immediately call analyze Pojo here, because
                                // there is currently no other type that can 
handle such a class.
                        }
                        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
index 00ab520..37a117f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.api.java.operators;
 
 import java.lang.reflect.InvocationTargetException;
@@ -24,10 +23,13 @@ import java.util.Arrays;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple7;
+import 
org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -93,7 +95,7 @@ public class KeysTest {
        }
        
        @Test 
-       public void testInvalid() throws Throwable {
+       public void testInvalidTuple() throws Throwable {
                TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, 
String>> typeInfo = new 
TupleTypeInfo<Tuple3<String,Tuple3<String,String,String>,String>>(
                                BasicTypeInfo.STRING_TYPE_INFO,
                                new TupleTypeInfo<Tuple3<String, String, 
String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),
@@ -101,7 +103,12 @@ public class KeysTest {
                ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, 
String>> fpk;
                
                String[][] tests = new String[][] {
-                               new String[] {"f11"},new String[] {"f-35"}, new 
String[] {"f0.f33"}, new String[] {"f1.f33"}
+                               new String[] {"f0.f1"}, // nesting into unnested
+                               new String[] {"f11"},
+                               new String[] {"f-35"},
+                               new String[] {"f0.f33"},
+                               new String[] {"f1.f33"},
+                               new String[] {"f1"} // select full tuple 
without saying "f1.*"
                };
                for(int i = 0; i < tests.length; i++) {
                        Throwable e = null;
@@ -115,6 +122,28 @@ public class KeysTest {
                }
        }
        
+       @Test 
+       public void testInvalidPojo() throws Throwable {
+               TypeInformation<ComplexNestedClass> ti = 
TypeExtractor.getForClass(ComplexNestedClass.class);
+               ExpressionKeys<ComplexNestedClass> ek;
+               
+               String[][] tests = new String[][] {
+                               new String[] {"nonexistent"},
+                               new String[] {"date.abc"}, // nesting into 
unnested
+                               new String[] {"word"} // select full tuple 
without saying "f1.*"
+               };
+               for(int i = 0; i < tests.length; i++) {
+                       Throwable e = null;
+                       try {
+                               ek = new 
ExpressionKeys<ComplexNestedClass>(tests[i], ti);
+                       } catch(Throwable t) {
+                               // System.err.println("Message: 
"+t.getMessage()); t.printStackTrace();
+                               e = t;  
+                       }
+                       Assert.assertNotNull(e);
+               }
+       }
+       
        @Test
        public void testTupleKeyExpansion() {
                TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, 
String>> typeInfo = new 
TupleTypeInfo<Tuple3<String,Tuple3<String,String,String>,String>>(
@@ -144,6 +173,10 @@ public class KeysTest {
                fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, 
String>, String>>(new String[] {"*"}, typeInfo);
                Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
fpk.computeLogicalKeyPositions());
                
+               // scala style "select all"
+               fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, 
String>, String>>(new String[] {"_"}, typeInfo);
+               Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
fpk.computeLogicalKeyPositions());
+               
                // this was a bug:
                fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, 
String>, String>>(new String[] {"f2"}, typeInfo);
                Assert.assertArrayEquals(new int[] {4}, 
fpk.computeLogicalKeyPositions());
@@ -177,10 +210,45 @@ public class KeysTest {
                complexFpk = new ExpressionKeys<Tuple3<String, 
Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] 
{"*"}, complexTypeInfo);
                Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, 
complexFpk.computeLogicalKeyPositions());
                
+               // scala style select all
+               complexFpk = new ExpressionKeys<Tuple3<String, 
Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] 
{"_"}, complexTypeInfo);
+               Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, 
complexFpk.computeLogicalKeyPositions());
+               
                complexFpk = new ExpressionKeys<Tuple3<String, 
Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] 
{"f1.f0.*"}, complexTypeInfo);
                Assert.assertArrayEquals(new int[] {1,2,3}, 
complexFpk.computeLogicalKeyPositions());
                
                complexFpk = new ExpressionKeys<Tuple3<String, 
Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] 
{"f2"}, complexTypeInfo);
                Assert.assertArrayEquals(new int[] {6}, 
complexFpk.computeLogicalKeyPositions());
        }
+
+       public static class Pojo1 {
+               public String a;
+               public String b;
+       }
+       public static class Pojo2 {
+               public String a2;
+               public String b2;
+       }
+       public static class PojoWithMultiplePojos {
+               public Pojo1 p1;
+               public Pojo2 p2;
+               public Integer i0;
+       }
+
+       @Test
+       public void testPojoKeys() {
+               TypeInformation<PojoWithMultiplePojos> ti = 
TypeExtractor.getForClass(PojoWithMultiplePojos.class);
+               ExpressionKeys<PojoWithMultiplePojos> ek;
+               ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"*"}, ti);
+               Assert.assertArrayEquals(new int[] {0,1,2,3,4}, 
ek.computeLogicalKeyPositions());
+
+               ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"p1.*"}, ti);
+               Assert.assertArrayEquals(new int[] {1,2}, 
ek.computeLogicalKeyPositions());
+
+               ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"p2.*"}, ti);
+               Assert.assertArrayEquals(new int[] {3,4}, 
ek.computeLogicalKeyPositions());
+
+               ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"i0"}, ti);
+               Assert.assertArrayEquals(new int[] {0}, 
ek.computeLogicalKeyPositions());
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 34dbd99..01d32c1 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -50,6 +50,14 @@ import com.google.common.collect.HashMultiset;
  */
 public class PojoTypeExtractionTest {
 
+       public static class HasDuplicateField extends WC {
+               private int count; // duplicate
+       }
+       
+       @Test(expected=RuntimeException.class)
+       public void testDuplicateFieldException() {
+               TypeExtractor.createTypeInfo(HasDuplicateField.class);
+       }
        
        // test with correct pojo types
        public static class WC { // is a pojo
@@ -210,6 +218,10 @@ public class PojoTypeExtractionTest {
                }
                ffd.clear();
                
+               // scala style full tuple selection for pojos
+               pojoType.getKey("complex.word._", 0, ffd);
+               Assert.assertEquals(3, ffd.size());
+               ffd.clear();
                
                pojoType.getKey("complex.*", 0, ffd);
                Assert.assertEquals(8, ffd.size());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index b4b1c19..695a42e 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -1247,7 +1247,6 @@ public class TypeExtractorTest {
        public static class InType extends MyObject<String> {}
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
-//     @Ignore
        public void testParamertizedCustomObject() {
                RichMapFunction<?, ?> function = new RichMapFunction<InType, 
MyObject<String>>() {
                        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
index 61f6167..e53f48a 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
@@ -28,10 +28,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Assert;
-import org.junit.Ignore;
 
 
-//@Ignore // TODO
 public class PojoComparatorTest extends 
ComparatorTestBase<PojoContainingTuple> {
        TypeInformation<PojoContainingTuple> type = 
TypeExtractor.getForClass(PojoContainingTuple.class);
        
@@ -39,7 +37,7 @@ public class PojoComparatorTest extends 
ComparatorTestBase<PojoContainingTuple>
                new PojoContainingTuple(1, 1L, 1L),
                new PojoContainingTuple(2, 2L, 2L),
                new PojoContainingTuple(8519, 85190L, 85190L),
-               new PojoContainingTuple(-51498, 85191L, 85191L),
+               new PojoContainingTuple(8520, 85191L, 85191L),
        };
 
        @Override
@@ -48,7 +46,7 @@ public class PojoComparatorTest extends 
ComparatorTestBase<PojoContainingTuple>
                CompositeType<PojoContainingTuple> cType = 
(CompositeType<PojoContainingTuple>) type;
                ExpressionKeys<PojoContainingTuple> keys = new 
ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType);
                boolean[] orders = new boolean[keys.getNumberOfKeyFields()];
-               Arrays.fill(orders, true);
+               Arrays.fill(orders, ascending);
                return 
cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
index 4f00b1d..9701086 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.test.exampleScalaPrograms;
 
-import org.apache.flink.examples.java.graph.EnumTrianglesOpt;
+import org.apache.flink.examples.scala.graph.EnumTrianglesOpt;
 import org.apache.flink.test.testdata.EnumTriangleData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
index 42c6a8e..bc026c9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.graph.PageRankBasic;
+import org.apache.flink.examples.scala.graph.PageRankBasic;
 import org.apache.flink.test.testdata.PageRankData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index fce56d1..63c4dfc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -53,7 +53,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class GroupReduceITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 19;
+       private static int NUM_PROGRAMS = 26;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -598,7 +598,175 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
                                        
                                        // return expected result
                                        return "3\n1\n";
-                               } 
+                               }
+                               case 20: {
+                                       /*
+                                        * Test string-based definition on 
group sort, based on test:
+                                        * check correctness of groupReduce 
with descending group sort
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
+                                                       
groupBy(1).sortGroup("f2", Order.DESCENDING).reduceGroup(new 
Tuple3SortedGroupReduce());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,1,Hi\n" +
+                                                       "5,2,Hello 
world-Hello\n" +
+                                                       "15,3,Luke Skywalker-I 
am fine.-Hello world, how are you?\n" +
+                                                       
"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
+                                                       
"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
+                                                       
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+                               }
+                               case 21: {
+                                       /*
+                                        * Test int-based definition on group 
sort, for (full) nested Tuple
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<Tuple2<Tuple2<Integer, 
Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+                                       DataSet<String> reduceDs = 
ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new 
NestedTupleReducer());
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "a--(1,1)-(1,2)-(1,3)-\n" +
+                                                       "b--(2,2)-\n"+
+                                                       
"c--(3,3)-(3,6)-(3,9)-\n";
+                               }
+                               case 22: {
+                                       /*
+                                        * Test int-based definition on group 
sort, for (partial) nested Tuple ASC
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<Tuple2<Tuple2<Integer, 
Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+                                       // f0.f0 is first integer
+                                       DataSet<String> reduceDs = 
ds.groupBy("f1").sortGroup("f0.f0", Order.ASCENDING).reduceGroup(new 
NestedTupleReducer());
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+                                       
+                                       // return expected result
+                                       return "a--(1,3)-(1,2)-(2,1)-\n" +
+                                                       "b--(2,2)-\n"+
+                                                       
"c--(3,3)-(3,6)-(4,9)-\n";
+                               }
+                               case 23: {
+                                       /*
+                                        * Test string-based definition on 
group sort, for (partial) nested Tuple DESC
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<Tuple2<Tuple2<Integer, 
Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+                                       // f0.f0 is first integer
+                                       DataSet<String> reduceDs = 
ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).reduceGroup(new 
NestedTupleReducer());
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+                                       
+                                       // return expected result
+                                       return "a--(2,1)-(1,3)-(1,2)-\n" +
+                                                       "b--(2,2)-\n"+
+                                                       
"c--(4,9)-(3,3)-(3,6)-\n";
+                               }
+                               case 24: {
+                                       /*
+                                        * Test string-based definition on 
group sort, for two grouping keys
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<Tuple2<Tuple2<Integer, 
Integer>, String>> ds = 
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+                                       // f0.f0 is first integer
+                                       DataSet<String> reduceDs = 
ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).sortGroup("f0.f1", 
Order.DESCENDING).reduceGroup(new NestedTupleReducer());
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+                                       
+                                       // return expected result
+                                       return "a--(2,1)-(1,3)-(1,2)-\n" +
+                                                       "b--(2,2)-\n"+
+                                                       
"c--(4,9)-(3,6)-(3,3)-\n";
+                               }
+                               case 25: {
+                                       /*
+                                        * Test string-based definition on 
group sort, for two grouping keys with Pojos
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<PojoContainingTupleAndWritable> 
ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
+                                       // f0.f0 is first integer
+                                       DataSet<String> reduceDs = 
ds.groupBy("hadoopFan").sortGroup("theTuple.f0", 
Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
+                                                       .reduceGroup(new 
GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, 
String>() {
+                                                               @Override
+                                                               public void 
reduce(
+                                                                               
Iterable<PojoContainingTupleAndWritable> values,
+                                                                               
Collector<String> out) throws Exception {
+                                                                       boolean 
once = false;
+                                                                       
StringBuilder concat = new StringBuilder();
+                                                                       
for(PojoContainingTupleAndWritable value : values) {
+                                                                               
if(!once) {
+                                                                               
        concat.append(value.hadoopFan.get());
+                                                                               
        concat.append("---");
+                                                                               
        once = true;
+                                                                               
}
+                                                                               
concat.append(value.theTuple);
+                                                                               
concat.append("-");
+                                                                       }
+                                                                       
out.collect(concat.toString());
+                                                               }
+                                       });
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+                                       
+                                       // return expected result
+                                       return "1---(10,100)-\n" +
+                                                       
"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
+                               }
+                               case 26: {
+                                       /*
+                                        * Test grouping with pojo containing 
multiple pojos (was a bug)
+                                        */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<PojoContainingTupleAndWritable> 
ds = CollectionDataSets.getPojoWithMultiplePojos(env);
+                                       // f0.f0 is first integer
+                                       DataSet<String> reduceDs = 
ds.groupBy("hadoopFan").sortGroup("theTuple.f0", 
Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
+                                                       .reduceGroup(new 
GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, 
String>() {
+                                                               @Override
+                                                               public void 
reduce(
+                                                                               
Iterable<PojoContainingTupleAndWritable> values,
+                                                                               
Collector<String> out) throws Exception {
+                                                                       boolean 
once = false;
+                                                                       
StringBuilder concat = new StringBuilder();
+                                                                       
for(PojoContainingTupleAndWritable value : values) {
+                                                                               
if(!once) {
+                                                                               
        concat.append(value.hadoopFan.get());
+                                                                               
        concat.append("---");
+                                                                               
        once = true;
+                                                                               
}
+                                                                               
concat.append(value.theTuple);
+                                                                               
concat.append("-");
+                                                                       }
+                                                                       
out.collect(concat.toString());
+                                                               }
+                                                       });
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1---(10,100)-\n" +
+                                                       
"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
+                               }
+                               
                                default: {
                                        throw new 
IllegalArgumentException("Invalid program id");
                                }
@@ -607,6 +775,27 @@ public class GroupReduceITCase extends JavaProgramTestBase 
{
        
        }
        
+       
+       public static class NestedTupleReducer implements 
GroupReduceFunction<Tuple2<Tuple2<Integer,Integer>,String>, String> {
+               @Override
+               public void reduce(
+                               Iterable<Tuple2<Tuple2<Integer, Integer>, 
String>> values,
+                               Collector<String> out)
+                               throws Exception {
+                       boolean once = false;
+                       StringBuilder concat = new StringBuilder();
+                       for(Tuple2<Tuple2<Integer, Integer>, String> value : 
values) {
+                               if(!once) {
+                                       concat.append(value.f1).append("--");
+                                       once = true;
+                               }
+                               concat.append(value.f0); // the tuple with the 
sorted groups
+                               concat.append("-");
+                       }
+                       out.collect(concat.toString());
+               }
+       }
+       
        public static class Tuple3GroupReduce implements 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd0be2ff/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index d023287..e5ab29a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -155,6 +155,26 @@ public class CollectionDataSets {
                return env.fromCollection(data, type);
        }
        
+       public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> 
getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
+               
+               List<Tuple2<Tuple2<Integer, Integer>, String>> data = new 
ArrayList<Tuple2<Tuple2<Integer, Integer>, String>>();
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(1,3), "a"));
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(1,2), "a"));
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(2,1), "a"));
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(2,2), "b"));
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(3,3), "c"));
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(3,6), "c"));
+               data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new 
Tuple2<Integer, Integer>(4,9), "c"));
+               
+               TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = 
new 
+                               TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, 
String>>(
+                                               new 
TupleTypeInfo<Tuple2<Integer, 
Integer>>(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO),
+                                               BasicTypeInfo.STRING_TYPE_INFO
+                               );
+               
+               return env.fromCollection(data, type);
+       }
+       
        public static DataSet<String> getStringDataSet(ExecutionEnvironment 
env) {
                
                List<String> data = new ArrayList<String>();
@@ -418,5 +438,37 @@ public class CollectionDataSets {
                data.add(new Tuple3<Integer,CrazyNested, POJO>(2, new 
CrazyNested("two", "duo", 2L), new POJO(1, "First",10, 100, 1000L, "One", 
10000L) )); // 1x
                return env.fromCollection(data);
        }
+
+       public static class Pojo1 {
+               public String a;
+               public String b;
+       }
+       public static class Pojo2 {
+               public String a2;
+               public String b2;
+       }
+       public static class PojoWithMultiplePojos {
+               public Pojo1 p1;
+               public Pojo2 p2;
+               public Integer i0;
+               public PojoWithMultiplePojos() {}
+               public PojoWithMultiplePojos(String a, String b, String a1, 
String b1, Integer i0) {
+                       p1 = new Pojo1();
+                       p1.a = a;
+                       p1.b = b;
+                       p2 = new Pojo2();
+                       p2.a2 = a1;
+                       p2.a2 = b1;
+                       this.i0 = i0;
+               }
+       }
+       
+       public static DataSet<PojoWithMultiplePojos> 
getPojoWithMultiplePojos(ExecutionEnvironment env) {
+               List<PojoWithMultiplePojos> data = new 
ArrayList<PojoWithMultiplePojos>();
+               data.add(new PojoWithMultiplePojos("a","aa","b","bb", 1));
+               data.add(new PojoWithMultiplePojos("b","bb","c","cc", 2));
+               data.add(new PojoWithMultiplePojos("d","dd","e","ee", 3));
+               return env.fromCollection(data);
+       }
        
 }

Reply via email to