[FLINK-1664] Adds check if a selected sort key is sortable

This closes #541


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f36eb54e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f36eb54e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f36eb54e

Branch: refs/heads/master
Commit: f36eb54ee6d8cc130439def98559b6b0a70b6c7b
Parents: f39aec8
Author: Fabian Hueske <fhue...@apache.org>
Authored: Fri Mar 27 21:37:59 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Apr 3 20:42:05 2015 +0200

----------------------------------------------------------------------
 .../api/common/typeinfo/TypeInformation.java    |  10 +-
 .../api/common/typeutils/CompositeType.java     |  20 ++
 .../flink/api/java/SortPartitionOperator.java   |  30 +-
 .../flink/api/java/operators/DataSink.java      |  28 ++
 .../apache/flink/api/java/operators/Keys.java   |  14 +-
 .../api/java/operators/SortedGrouping.java      |  71 +++--
 .../flink/api/java/typeutils/PojoTypeInfo.java  |   7 +-
 .../api/java/typeutils/TupleTypeInfoBase.java   |  19 --
 .../flink/api/java/operator/DataSinkTest.java   |  46 +--
 .../flink/api/java/operator/GroupingTest.java   | 278 +++++++++++++++++--
 .../api/java/operator/SortPartitionTest.java    | 204 ++++++++++++++
 11 files changed, 637 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 4fa02e3..bb50e32 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -132,7 +132,15 @@ public abstract class TypeInformation<T> implements 
Serializable {
         * @return True, if the type can be used as a key, false otherwise.
         */
        public abstract boolean isKeyType();
-       
+
+       /**
+        * Checks whether this type can be used as a key for sorting.
+        * The order produced by sorting this type must be meaningful.
+        */
+       public boolean isSortKeyType() {
+               return isKeyType();
+       }
+
        /**
         * Creates a serializer for the type. The serializer may use the 
ExecutionConfig
         * for parameterization.

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 54a1e13..de39ec8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -169,6 +169,26 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
                return getFieldIndex(fieldName) >= 0;
        }
 
+       @Override
+       public boolean isKeyType() {
+               for(int i=0;i<this.getArity();i++) {
+                       if (!this.getTypeAt(i).isKeyType()) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       @Override
+       public boolean isSortKeyType() {
+               for(int i=0;i<this.getArity();i++) {
+                       if (!this.getTypeAt(i).isSortKeyType()) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
        /**
         * Returns the names of the composite fields of this type. The order of 
the returned array must
         * be consistent with the internal field index ordering.

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
index c8f8bbc..988144b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
@@ -24,9 +24,11 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.SingleInputOperator;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 
 import java.util.Arrays;
 
@@ -96,11 +98,23 @@ public class SortPartitionOperator<T> extends 
SingleInputOperator<T, T, SortPart
 
        private int[] getFlatFields(int field) {
 
+               if(!(super.getType() instanceof TupleTypeInfoBase<?>)) {
+                       throw new InvalidProgramException("Field positions can 
only be specified on Tuple or " +
+                                       "Case Class types.");
+               }
+               else {
+                       // check selected field is sortable type
+                       TypeInformation<?> sortKeyType = 
((TupleTypeInfoBase<?>) super.getType()).getTypeAt(field);
+                       if (!sortKeyType.isSortKeyType()) {
+                               throw new InvalidProgramException("Selected 
sort key is not a sortable type " + sortKeyType);
+                       }
+               }
+
                Keys.ExpressionKeys<T> ek;
                try {
                        ek = new Keys.ExpressionKeys<T>(new int[]{field}, 
super.getType());
                } catch(IllegalArgumentException iae) {
-                       throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
+                       throw new InvalidProgramException("Invalid 
specification of field position.", iae);
                }
                return ek.computeLogicalKeyPositions();
        }
@@ -108,6 +122,13 @@ public class SortPartitionOperator<T> extends 
SingleInputOperator<T, T, SortPart
        private int[] getFlatFields(String fields) {
 
                if(super.getType() instanceof CompositeType) {
+
+                       // check selected field is sortable type
+                       TypeInformation<?> sortKeyType = ((CompositeType<?>) 
super.getType()).getTypeAt(fields);
+                       if (!sortKeyType.isSortKeyType()) {
+                               throw new InvalidProgramException("Selected 
sort key is not a sortable type " + sortKeyType);
+                       }
+
                        // compute flat field positions for (nested) sorting 
fields
                        Keys.ExpressionKeys<T> ek;
                        try {
@@ -123,6 +144,12 @@ public class SortPartitionOperator<T> extends 
SingleInputOperator<T, T, SortPart
                                throw new InvalidProgramException("Output 
sorting of non-composite types can only be defined on the full type. " +
                                                "Use a field wildcard for that 
(\"*\" or \"_\")");
                        } else {
+
+                               // check if selected field is sortable type
+                               if (!super.getType().isSortKeyType()) {
+                                       throw new 
InvalidProgramException("Selected sort key cannot be sorted: " + 
super.getType());
+                               }
+
                                return new int[]{0};
                        }
                }
@@ -149,7 +176,6 @@ public class SortPartitionOperator<T> extends 
SingleInputOperator<T, T, SortPart
                }
        }
 
-
        // 
--------------------------------------------------------------------------------------------
        //  Translation
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 83ec021..5b5b031 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Nothing;
 import org.apache.flink.api.java.DataSet;
@@ -114,6 +115,7 @@ public class DataSink<T> {
                if (field >= this.type.getArity()) {
                        throw new InvalidProgramException("Order key out of 
tuple bounds.");
                }
+               isValidSortKeyType(field);
 
                // get flat keys
                Keys.ExpressionKeys<T> ek;
@@ -166,9 +168,11 @@ public class DataSink<T> {
                Order[] orders;
 
                if(this.type instanceof CompositeType) {
+
                        // compute flat field positions for (nested) sorting 
fields
                        Keys.ExpressionKeys<T> ek;
                        try {
+                               isValidSortKeyType(fieldExpression);
                                ek = new Keys.ExpressionKeys<T>(new 
String[]{fieldExpression}, this.type);
                        } catch(IllegalArgumentException iae) {
                                throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
@@ -183,6 +187,8 @@ public class DataSink<T> {
                                throw new InvalidProgramException("Output 
sorting of non-composite types can only be defined on the full type. " +
                                                "Use a field wildcard for that 
(\"*\" or \"_\")");
                        } else {
+                               isValidSortKeyType(fieldExpression);
+
                                numFields = 1;
                                fields = new int[]{0};
                                orders = new Order[]{order};
@@ -208,6 +214,28 @@ public class DataSink<T> {
                return this;
        }
 
+       private void isValidSortKeyType(int field) {
+               TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) 
this.type).getTypeAt(field);
+               if (!sortKeyType.isSortKeyType()) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type " + sortKeyType);
+               }
+       }
+
+       private void isValidSortKeyType(String field) {
+               TypeInformation<?> sortKeyType;
+
+               field = field.trim();
+               if(field.equals("*") || field.equals("_")) {
+                       sortKeyType = this.type;
+               } else {
+                       sortKeyType = ((CompositeType<?>) 
this.type).getTypeAt(field);
+               }
+
+               if (!sortKeyType.isSortKeyType()) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type " + sortKeyType);
+               }
+       }
+
        /**
         * @return Configuration for the OutputFormat.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 2c067fd..a2cde07 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -82,19 +82,19 @@ public abstract class Keys<T> {
 
                        this.keyExtractor = keyExtractor;
                        this.keyType = keyType;
-                       
+
+                       if(!keyType.isKeyType()) {
+                               throw new InvalidProgramException("Return type 
"+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key 
type");
+                       }
+
                        // we have to handle a special case here:
-                       // if the keyType is a tuple type, we need to select 
the full tuple with all its fields.
-                       if(keyType.isTupleType()) {
+                       // if the keyType is a composite type, we need to 
select the full type with all its fields.
+                       if(keyType instanceof CompositeType) {
                                ExpressionKeys<K> ek = new 
ExpressionKeys<K>(new String[] {ExpressionKeys.SELECT_ALL_CHAR}, keyType);
                                logicalKeyFields = 
ek.computeLogicalKeyPositions();
                        } else {
                                logicalKeyFields = new int[] {0};
                        }
-
-                       if (!this.keyType.isKeyType()) {
-                               throw new IllegalArgumentException("Invalid 
type of KeySelector keys");
-                       }
                }
 
                public TypeInformation<K> getKeyType() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 287bf82..4c6c952 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import com.google.common.base.Preconditions;
@@ -63,6 +64,8 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (field >= dataSet.getType().getArity()) {
                        throw new IllegalArgumentException("Order key out of 
tuple bounds.");
                }
+               isValidSortKeyType(field);
+
                // use int-based expression key to properly resolve nested 
tuples for grouping
                ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, 
dataSet.getType());
                this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
@@ -79,6 +82,8 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (!(dataSet.getType() instanceof CompositeType)) {
                        throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for composite data types (pojo / tuple / 
case class)");
                }
+               isValidSortKeyType(field);
+
                // resolve String-field to int using the expression keys
                ExpressionKeys<T> ek = new ExpressionKeys<T>(new 
String[]{field}, dataSet.getType());
                this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
@@ -95,6 +100,10 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (!(this.keys instanceof Keys.SelectorFunctionKeys)) {
                        throw new InvalidProgramException("Sorting on 
KeySelector keys only works with KeySelector grouping.");
                }
+               TypeInformation<?> sortKeyType = keySelector.getKeyType();
+               if(!sortKeyType.isSortKeyType()) {
+                       throw new InvalidProgramException("Key type " + 
sortKeyType +" is not sortable.");
+               }
 
                this.groupSortKeyPositions = 
keySelector.computeLogicalKeyPositions();
                for (int i = 0; i < groupSortKeyPositions.length; i++) {
@@ -218,35 +227,22 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (field >= dataSet.getType().getArity()) {
                        throw new IllegalArgumentException("Order key out of 
tuple bounds.");
                }
+               isValidSortKeyType(field);
+
                ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, 
dataSet.getType());
                addSortGroupInternal(ek, order);
                return this;
        }
-       
-       private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
-               Preconditions.checkArgument(order != null, "Order can not be 
null");
-               int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
-               
-               int newLength = this.groupSortKeyPositions.length + 
additionalKeyPositions.length;
-               this.groupSortKeyPositions = 
Arrays.copyOf(this.groupSortKeyPositions, newLength);
-               this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, 
newLength);
-               int pos = newLength - additionalKeyPositions.length;
-               int off = newLength - additionalKeyPositions.length;
-               for(;pos < newLength; pos++) {
-                       this.groupSortKeyPositions[pos] = 
additionalKeyPositions[pos - off];
-                       this.groupSortOrders[pos] = order; // use the same order
-               }
-       }
-       
+
        /**
         * Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements 
within a group on the specified field in the specified {@link Order}.</br>
         * <b>Note: Only groups of Tuple or Pojo elements can be 
sorted.</b><br/>
         * Groups can be sorted by multiple fields by chaining {@link 
#sortGroup(String, Order)} calls.
-        * 
+        *
         * @param field The Tuple or Pojo field on which the group is sorted.
         * @param order The Order in which the specified field is sorted.
         * @return A SortedGrouping with specified order of group element.
-        * 
+        *
         * @see org.apache.flink.api.java.tuple.Tuple
         * @see Order
         */
@@ -257,9 +253,48 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (! (dataSet.getType() instanceof CompositeType)) {
                        throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for composite data types (pojo / tuple / 
case class)");
                }
+               isValidSortKeyType(field);
+
                ExpressionKeys<T> ek = new ExpressionKeys<T>(new 
String[]{field}, dataSet.getType());
                addSortGroupInternal(ek, order);
                return this;
        }
+       
+       private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
+               Preconditions.checkArgument(order != null, "Order can not be 
null");
+               int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
+               
+               int newLength = this.groupSortKeyPositions.length + 
additionalKeyPositions.length;
+               this.groupSortKeyPositions = 
Arrays.copyOf(this.groupSortKeyPositions, newLength);
+               this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, 
newLength);
+               int pos = newLength - additionalKeyPositions.length;
+               int off = newLength - additionalKeyPositions.length;
+               for(;pos < newLength; pos++) {
+                       this.groupSortKeyPositions[pos] = 
additionalKeyPositions[pos - off];
+                       this.groupSortOrders[pos] = order; // use the same order
+               }
+       }
+
+       private void isValidSortKeyType(int field) {
+               TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) 
dataSet.getType()).getTypeAt(field);
+               if (!sortKeyType.isSortKeyType()) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type " + sortKeyType);
+               }
+       }
+
+       private void isValidSortKeyType(String field) {
+               TypeInformation<?> sortKeyType;
+
+               field = field.trim();
+               if(field.equals("*") || field.equals("_")) {
+                       sortKeyType = this.getDataSet().getType();
+               } else {
+                       sortKeyType = ((CompositeType<?>) 
this.getDataSet().getType()).getTypeAt(field);
+               }
+
+               if (!sortKeyType.isSortKeyType()) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type " + sortKeyType);
+               }
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 1dcee24..2f3db7c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -118,8 +118,11 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        }
 
        @Override
-       public boolean isKeyType() {
-               return Comparable.class.isAssignableFrom(typeClass);
+       public boolean isSortKeyType() {
+               // Support for sorting POJOs that implement Comparable is not 
implemented yet.
+               // Since the order of fields in a POJO type is not well 
defined, sorting on fields
+               //   gives only some undefined order.
+               return false;
        }
        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index d1c2c9d..5051449 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -223,11 +223,6 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
        }
        
        @Override
-       public boolean isKeyType() {
-               return isValidKeyType(this);
-       }
-
-       @Override
        public boolean equals(Object obj) {
                if (obj instanceof TupleTypeInfoBase) {
                        @SuppressWarnings("unchecked")
@@ -245,20 +240,6 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
                return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
        }
 
-       private boolean isValidKeyType(TypeInformation<?> typeInfo) {
-               if(typeInfo instanceof TupleTypeInfoBase) {
-                       TupleTypeInfoBase<?> tupleType = 
((TupleTypeInfoBase<?>)typeInfo);
-                       for(int i=0;i<tupleType.getArity();i++) {
-                               if (!isValidKeyType(tupleType.getTypeAt(i))) {
-                                       return false;
-                               }
-                       }
-                       return true;
-               } else  {
-                       return typeInfo.isKeyType();
-               }
-       }
-
        @Override
        public String toString() {
                StringBuilder bld = new StringBuilder("Tuple");

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
index 7a7ed14..37ad381 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -256,23 +256,6 @@ public class DataSinkTest {
        }
 
        @Test
-       public void testPojoSingleOrderFull() {
-
-               final ExecutionEnvironment env = ExecutionEnvironment
-                               .getExecutionEnvironment();
-               DataSet<CustomType> pojoDs = env
-                               .fromCollection(pojoData);
-
-               // should work
-               try {
-                       pojoDs.writeAsText("/tmp/willNotHappen")
-                                       .sortLocalOutput("*", Order.ASCENDING);
-               } catch (Exception e) {
-                       Assert.fail();
-               }
-       }
-
-       @Test
        public void testPojoTwoOrder() {
 
                final ExecutionEnvironment env = ExecutionEnvironment
@@ -317,6 +300,35 @@ public class DataSinkTest {
                                .sortLocalOutput("notThere", Order.DESCENDING);
        }
 
+       @Test(expected = InvalidProgramException.class)
+       public void testPojoSingleOrderFull() {
+
+               final ExecutionEnvironment env = ExecutionEnvironment
+                               .getExecutionEnvironment();
+               DataSet<CustomType> pojoDs = env
+                               .fromCollection(pojoData);
+
+               // must not work
+               pojoDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput("*", Order.ASCENDING);
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testArrayOrderFull() {
+
+               List<Object[]> arrayData = new ArrayList<Object[]>();
+               arrayData.add(new Object[0]);
+
+               final ExecutionEnvironment env = ExecutionEnvironment
+                               .getExecutionEnvironment();
+               DataSet<Object[]> pojoDs = env
+                               .fromCollection(arrayData);
+
+               // must not work
+               pojoDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput("*", Order.ASCENDING);
+       }
+
        /**
         * Custom data type, for testing purposes.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index c958680..314695f 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -24,13 +24,16 @@ import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,11 +51,23 @@ public class GroupingTest {
                                        BasicTypeInfo.LONG_TYPE_INFO,
                                        BasicTypeInfo.INT_TYPE_INFO
                        );
-       
+
+       private final TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>> 
tupleWithCustomInfo = new
+                       TupleTypeInfo<Tuple4<Integer, Long, CustomType, 
Long[]>>(
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               TypeExtractor.createTypeInfo(CustomType.class),
+                               BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+                       );
+
        // LONG DATA
        private final List<Long> emptyLongData = new ArrayList<Long>();
        
        private final List<CustomType> customTypeData = new 
ArrayList<CustomType>();
+
+       private final List<Tuple4<Integer, Long, CustomType, Long[]>> 
tupleWithCustomData =
+                       new ArrayList<Tuple4<Integer, Long, CustomType, 
Long[]>>();
+
        
        @Test  
        public void testGroupByKeyFields1() {
@@ -187,7 +202,6 @@ public class GroupingTest {
                // should not work, key out of tuple bounds
                ds.groupBy("nested.myNonExistent");
        }
-
        
        @Test
        @SuppressWarnings("serial")
@@ -233,41 +247,67 @@ public class GroupingTest {
                        Assert.fail();
                }
        }
-       
-       @Test(expected=IllegalArgumentException.class)
+
+       @Test
        @SuppressWarnings("serial")
        public void testGroupByKeySelector3() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                this.customTypeData.add(new CustomType());
-               
-               DataSet<CustomType> customDs = 
env.fromCollection(customTypeData);
-               // should not work
-               customDs.groupBy(
-                               new KeySelector<GroupingTest.CustomType, 
CustomType>() {
-                                       @Override
-                                       public CustomType getKey(CustomType 
value) {
-                                               return value;
-                               }
-               });
+
+               try {
+                       DataSet<CustomType> customDs = 
env.fromCollection(customTypeData);
+                       // should not work
+                       customDs.groupBy(
+                                       new 
KeySelector<GroupingTest.CustomType, CustomType>() {
+                                               @Override
+                                               public CustomType 
getKey(CustomType value) {
+                                                       return value;
+                                               }
+                                       });
+               } catch(Exception e) {
+                       Assert.fail();
+               }
        }
-       
-       @Test(expected=IllegalArgumentException.class)
+
+       @Test
        @SuppressWarnings("serial")
        public void testGroupByKeySelector4() {
-               
+
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                this.customTypeData.add(new CustomType());
-               
+
+               try {
+                       DataSet<CustomType> customDs = 
env.fromCollection(customTypeData);
+                       // should not work
+                       customDs.groupBy(
+                                       new 
KeySelector<GroupingTest.CustomType, Tuple2<Integer, 
GroupingTest.CustomType>>() {
+                                               @Override
+                                               public Tuple2<Integer, 
CustomType> getKey(CustomType value) {
+                                                       return new 
Tuple2<Integer, CustomType>(value.myInt, value);
+                                               }
+                                       });
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       @SuppressWarnings("serial")
+       public void testGroupByKeySelector5() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               this.customTypeData.add(new CustomType());
+
                DataSet<CustomType> customDs = 
env.fromCollection(customTypeData);
                // should not work
                customDs.groupBy(
-                               new KeySelector<GroupingTest.CustomType, 
Tuple2<Integer, GroupingTest.CustomType>>() {
+                               new KeySelector<GroupingTest.CustomType, 
CustomType2>() {
                                        @Override
-                                       public Tuple2<Integer, CustomType> 
getKey(CustomType value) {
-                                               return new Tuple2<Integer, 
CustomType>(value.myInt, value);
-                               }
-               });
+                                       public CustomType2 getKey(CustomType 
value) {
+                                               return new CustomType2();
+                                       }
+                               });
        }
        
        @Test
@@ -313,6 +353,30 @@ public class GroupingTest {
                }).sortGroup(0, Order.ASCENDING);
                
        }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testGroupSortKeyFields4() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should not work
+               tupleDs.groupBy(0)
+                               .sortGroup(2, Order.ASCENDING);
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testGroupSortKeyFields5() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should not work
+               tupleDs.groupBy(0)
+                               .sortGroup(3, Order.ASCENDING);
+       }
        
        @Test
        public void testChainedGroupSortKeyFields() {
@@ -327,7 +391,166 @@ public class GroupingTest {
                        Assert.fail();
                }
        }
-       
+
+       @Test
+       public void testGroupSortByKeyExpression1() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should work
+               try {
+                       tupleDs.groupBy("f0").sortGroup("f1", Order.ASCENDING);
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test
+       public void testGroupSortByKeyExpression2() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should work
+               try {
+                       tupleDs.groupBy("f0").sortGroup("f2.myString", 
Order.ASCENDING);
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test
+       public void testGroupSortByKeyExpression3() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should work
+               try {
+                       tupleDs.groupBy("f0")
+                                       .sortGroup("f2.myString", 
Order.ASCENDING)
+                                       .sortGroup("f1", Order.DESCENDING);
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testGroupSortByKeyExpression4() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should not work
+               tupleDs.groupBy("f0")
+                               .sortGroup("f2", Order.ASCENDING);
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testGroupSortByKeyExpression5() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should not work
+               tupleDs.groupBy("f0")
+                               .sortGroup("f1", Order.ASCENDING)
+                               .sortGroup("f2", Order.ASCENDING);
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testGroupSortByKeyExpression6() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should not work
+               tupleDs.groupBy("f0")
+                               .sortGroup("f3", Order.ASCENDING);
+       }
+
+       @SuppressWarnings("serial")
+       @Test
+       public void testGroupSortByKeySelector1() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should not work
+               tupleDs.groupBy(
+                               new 
KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+                                       @Override
+                                       public Long getKey(Tuple4<Integer, 
Long, CustomType, Long[]> value) throws Exception {
+                                               return value.f1;
+                                       }
+                               })
+                               .sortGroup(
+                                               new KeySelector<Tuple4<Integer, 
Long, CustomType, Long[]>, Integer>() {
+                                                       @Override
+                                                       public Integer 
getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+                                                               return value.f0;
+                                                       }
+                                               }, Order.ASCENDING);
+       }
+
+       @SuppressWarnings("serial")
+       @Test(expected = InvalidProgramException.class)
+       public void testGroupSortByKeySelector2() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should not work
+               tupleDs.groupBy(
+                               new 
KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+                                       @Override
+                                       public Long getKey(Tuple4<Integer, 
Long, CustomType, Long[]> value) throws Exception {
+                                               return value.f1;
+                                       }
+                               })
+                               .sortGroup(
+                                               new KeySelector<Tuple4<Integer, 
Long, CustomType, Long[]>, CustomType>() {
+                                                       @Override
+                                                       public CustomType 
getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+                                                               return value.f2;
+                                                       }
+                                               }, Order.ASCENDING);
+       }
+
+       @SuppressWarnings("serial")
+       @Test(expected = InvalidProgramException.class)
+       public void testGroupSortByKeySelector3() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+                               env.fromCollection(tupleWithCustomData, 
tupleWithCustomInfo);
+
+               // should not work
+               tupleDs.groupBy(
+                               new 
KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+                                       @Override
+                                       public Long getKey(Tuple4<Integer, 
Long, CustomType, Long[]> value) throws Exception {
+                                               return value.f1;
+                                       }
+                               })
+                               .sortGroup(
+                                               new KeySelector<Tuple4<Integer, 
Long, CustomType, Long[]>, Long[]>() {
+                                                       @Override
+                                                       public Long[] 
getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
+                                                               return value.f3;
+                                                       }
+                                               }, Order.ASCENDING);
+       }
+
 
        public static class CustomType implements Serializable {
                
@@ -354,4 +577,11 @@ public class GroupingTest {
                        return myInt+","+myLong+","+myString;
                }
        }
+
+       public static class CustomType2 implements Serializable {
+
+               public int myInt;
+               public int[] myIntArray;
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
new file mode 100644
index 0000000..a4e2bbc
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SortPartitionTest {
+
+       // TUPLE DATA
+       private final List<Tuple5<Integer, Long, String, Long, Integer>> 
emptyTupleData =
+                       new ArrayList<Tuple5<Integer, Long, String, Long, 
Integer>>();
+
+       private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, 
Integer>> tupleTypeInfo = new
+                       TupleTypeInfo<Tuple5<Integer, Long, String, Long, 
Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       BasicTypeInfo.STRING_TYPE_INFO,
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       private final TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>> 
tupleWithCustomInfo = new
+                       TupleTypeInfo<Tuple4<Integer, Long, CustomType, 
Long[]>>(
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               TypeExtractor.createTypeInfo(CustomType.class),
+                               BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+                       );
+
+       // LONG DATA
+       private final List<Long> emptyLongData = new ArrayList<Long>();
+
+       private final List<CustomType> customTypeData = new 
ArrayList<CustomType>();
+
+       private final List<Tuple4<Integer, Long, CustomType, Long[]>> 
tupleWithCustomData =
+                       new ArrayList<Tuple4<Integer, Long, CustomType, 
Long[]>>();
+
+
+       @Test
+       public void testSortPartitionPositionKeys1() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = 
env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+               // should work
+               try {
+                       tupleDs.sortPartition(0, Order.ASCENDING);
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test
+       public void testSortPartitionPositionKeys2() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = 
env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+               // should work
+               try {
+                       tupleDs
+                                       .sortPartition(0, Order.ASCENDING)
+                                       .sortPartition(3, Order.DESCENDING);
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testSortPartitionWithPositionKeys3() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = 
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+               // must not work
+               tupleDs.sortPartition(2, Order.ASCENDING);
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testSortPartitionWithPositionKeys4() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = 
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+               // must not work
+               tupleDs.sortPartition(3, Order.ASCENDING);
+       }
+
+       @Test
+       public void testSortPartitionExpressionKeys1() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = 
env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+               // should work
+               try {
+                       tupleDs.sortPartition("f1", Order.ASCENDING);
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test
+       public void testSortPartitionExpressionKeys2() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = 
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+               // should work
+               try {
+                       tupleDs
+                                       .sortPartition("f0", Order.ASCENDING)
+                                       .sortPartition("f2.nested.myInt", 
Order.DESCENDING);
+               } catch(Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testSortPartitionWithExpressionKeys3() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = 
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+               // must not work
+               tupleDs.sortPartition("f2.nested", Order.ASCENDING);
+       }
+
+       @Test(expected = InvalidProgramException.class)
+       public void testSortPartitionWithExpressionKeys4() {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = 
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+               // must not work
+               tupleDs.sortPartition("f3", Order.ASCENDING);
+       }
+
+       public static class CustomType implements Serializable {
+               
+               public static class Nest {
+                       public int myInt;
+               }
+               private static final long serialVersionUID = 1L;
+               
+               public int myInt;
+               public long myLong;
+               public String myString;
+               public Nest nested;
+               
+               public CustomType() {};
+               
+               public CustomType(int i, long l, String s) {
+                       myInt = i;
+                       myLong = l;
+                       myString = s;
+               }
+               
+               @Override
+               public String toString() {
+                       return myInt+","+myLong+","+myString;
+               }
+       }
+
+       public static class CustomType2 implements Serializable {
+
+               public int myInt;
+               public int[] myIntArray;
+
+       }
+}

Reply via email to