[FLINK-1666][FLINK-1903][FLINK-3233] Refactor expression keys.

- Unify usage of expression keys: FLINK-1666
- Unify supported key expressions (incl. support for partitioning on atomic 
types: FLINK-3233)
- Remove removal of duplicate keys: FLINK-1903
- Unify checks for sort keys
- Add more tests for SelectorFunctionKeys and ExpressionKeys

This closes #1520


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af029e7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af029e7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af029e7e

Branch: refs/heads/master
Commit: af029e7e0c6377ec12edafedb4d6ea53c4fa9fe9
Parents: 9840136
Author: Fabian Hueske <fhue...@apache.org>
Authored: Thu Jan 14 22:48:43 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Jan 26 11:16:58 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  12 +-
 .../flink/api/java/io/SplitDataProperties.java  |  52 +-
 .../api/java/operators/CoGroupOperator.java     |  31 +-
 .../flink/api/java/operators/DataSink.java      |  77 +--
 .../api/java/operators/DistinctOperator.java    |   2 +-
 .../apache/flink/api/java/operators/Keys.java   | 444 ++++++++---------
 .../api/java/operators/PartitionOperator.java   |   5 -
 .../java/operators/SortPartitionOperator.java   |  62 +--
 .../api/java/operators/SortedGrouping.java      |  70 +--
 .../api/java/operator/CoGroupOperatorTest.java  |   8 +-
 .../flink/api/java/operator/DataSinkTest.java   |  79 +--
 .../api/java/operator/DistinctOperatorTest.java |   2 +-
 .../operator/FullOuterJoinOperatorTest.java     |   2 +-
 .../flink/api/java/operator/GroupingTest.java   |   6 +-
 .../api/java/operator/JoinOperatorTest.java     |   8 +-
 .../operator/LeftOuterJoinOperatorTest.java     |   2 +-
 .../operator/RightOuterJoinOperatorTest.java    |   2 +-
 .../api/java/operators/ExpressionKeysTest.java  | 479 +++++++++++++++++++
 .../flink/api/java/operators/KeysTest.java      | 284 -----------
 .../operators/SelectorFunctionKeysTest.java     | 153 ++++++
 .../api/java/table/JavaBatchTranslator.scala    |   4 +-
 .../table/typeinfo/RenamingProxyTypeInfo.scala  |  21 +-
 .../apache/flink/api/scala/CoGroupDataSet.scala |  17 +-
 .../org/apache/flink/api/scala/DataSet.scala    |  14 +-
 .../apache/flink/api/scala/GroupedDataSet.scala |  18 +-
 .../test/javaApiOperators/DataSinkITCase.java   |  54 ++-
 .../test/javaApiOperators/PartitionITCase.java  |  79 ++-
 .../scala/operators/CoGroupOperatorTest.scala   |   8 +-
 .../scala/operators/DistinctOperatorTest.scala  |   2 +-
 .../api/scala/operators/GroupingTest.scala      |   6 +-
 .../api/scala/operators/JoinOperatorTest.scala  |   8 +-
 31 files changed, 1142 insertions(+), 869 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index be84032..5633185 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -601,7 +601,7 @@ public abstract class DataSet<T> {
         * @return A DistinctOperator that represents the distinct DataSet.
         */
        public DistinctOperator<T> distinct(int... fields) {
-               return new DistinctOperator<>(this, new 
Keys.ExpressionKeys<>(fields, getType(), true), Utils.getCallLocationName());
+               return new DistinctOperator<>(this, new 
Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName());
        }
        
        /**
@@ -686,7 +686,7 @@ public abstract class DataSet<T> {
         * @see DataSet
         */
        public UnsortedGrouping<T> groupBy(int... fields) {
-               return new UnsortedGrouping<>(this, new 
Keys.ExpressionKeys<>(fields, getType(), false));
+               return new UnsortedGrouping<>(this, new 
Keys.ExpressionKeys<>(fields, getType()));
        }
 
        /**
@@ -1165,7 +1165,7 @@ public abstract class DataSet<T> {
                Preconditions.checkNotNull(workset);
                Preconditions.checkNotNull(keyPositions);
                
-               Keys.ExpressionKeys<T> keys = new 
Keys.ExpressionKeys<>(keyPositions, getType(), false);
+               Keys.ExpressionKeys<T> keys = new 
Keys.ExpressionKeys<>(keyPositions, getType());
                return new DeltaIteration<>(getExecutionEnvironment(), 
getType(), this, workset, keys, maxIterations);
        }
 
@@ -1214,7 +1214,7 @@ public abstract class DataSet<T> {
         * @return The partitioned DataSet.
         */
        public PartitionOperator<T> partitionByHash(int... fields) {
-               return new PartitionOperator<>(this, PartitionMethod.HASH, new 
Keys.ExpressionKeys<>(fields, getType(), false), Utils.getCallLocationName());
+               return new PartitionOperator<>(this, PartitionMethod.HASH, new 
Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName());
        }
        
        /**
@@ -1254,7 +1254,7 @@ public abstract class DataSet<T> {
         * @return The partitioned DataSet.
         */
        public PartitionOperator<T> partitionByRange(int... fields) {
-               return new PartitionOperator<>(this, PartitionMethod.RANGE, new 
Keys.ExpressionKeys<>(fields, getType(), false), Utils.getCallLocationName());
+               return new PartitionOperator<>(this, PartitionMethod.RANGE, new 
Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName());
        }
 
        /**
@@ -1297,7 +1297,7 @@ public abstract class DataSet<T> {
         * @return The partitioned DataSet.
         */
        public <K> PartitionOperator<T> partitionCustom(Partitioner<K> 
partitioner, int field) {
-               return new PartitionOperator<>(this, new 
Keys.ExpressionKeys<>(new int[] {field}, getType(), false), clean(partitioner), 
Utils.getCallLocationName());
+               return new PartitionOperator<>(this, new 
Keys.ExpressionKeys<>(new int[] {field}, getType()), clean(partitioner), 
Utils.getCallLocationName());
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
index 56d2a64..c44929f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.Keys;
 
@@ -124,7 +123,7 @@ public class SplitDataProperties<T> implements 
GenericDataSourceBase.SplitDataPr
 
                this.splitPartitionKeys = getAllFlatKeys(partitionFields);
                if (partitionMethodId != null) {
-                       this.splitPartitioner = new 
SourcePartitionerMarker<T>(partitionMethodId);
+                       this.splitPartitioner = new 
SourcePartitionerMarker<>(partitionMethodId);
                } else {
                        this.splitPartitioner = null;
                }
@@ -175,7 +174,7 @@ public class SplitDataProperties<T> implements 
GenericDataSourceBase.SplitDataPr
 
                this.splitPartitionKeys = getAllFlatKeys(partitionKeysA);
                if(partitionMethodId != null) {
-                       this.splitPartitioner = new 
SourcePartitionerMarker<T>(partitionMethodId);
+                       this.splitPartitioner = new 
SourcePartitionerMarker<>(partitionMethodId);
                }
                else {
                        this.splitPartitioner = null;
@@ -331,7 +330,8 @@ public class SplitDataProperties<T> implements 
GenericDataSourceBase.SplitDataPr
 
                for(int i=0; i<orderKeysA.length; i++) {
                        String keyExp = orderKeysA[i];
-                       int[] flatKeys = this.computeFlatKeys(keyExp);
+                       Keys.ExpressionKeys<T> ek = new 
Keys.ExpressionKeys<>(keyExp, this.type);
+                       int[] flatKeys = ek.computeLogicalKeyPositions();
 
                        for(int key : flatKeys) {
                                // check for duplicates
@@ -371,7 +371,9 @@ public class SplitDataProperties<T> implements 
GenericDataSourceBase.SplitDataPr
                int[] allKeys = null;
 
                for(String keyExp : fieldExpressions) {
-                       int[] flatKeys = this.computeFlatKeys(keyExp);
+                       Keys.ExpressionKeys<T> ek = new 
Keys.ExpressionKeys<>(keyExp, this.type);
+                       int[] flatKeys = ek.computeLogicalKeyPositions();
+
                        if(allKeys == null) {
                                allKeys = flatKeys;
                        } else {
@@ -387,9 +389,7 @@ public class SplitDataProperties<T> implements 
GenericDataSourceBase.SplitDataPr
                                int oldLength = allKeys.length;
                                int newLength = oldLength + flatKeys.length;
                                allKeys = Arrays.copyOf(allKeys, newLength);
-                               for(int i=0;i<flatKeys.length; i++) {
-                                       allKeys[oldLength+i] = flatKeys[i];
-                               }
+                               System.arraycopy(flatKeys, 0, allKeys, 
oldLength, flatKeys.length);
                        }
                }
 
@@ -397,47 +397,17 @@ public class SplitDataProperties<T> implements 
GenericDataSourceBase.SplitDataPr
        }
 
        private int[] getAllFlatKeys(int[] fieldPositions) {
-
-               Keys.ExpressionKeys<T> ek;
-               try {
-                       ek = new Keys.ExpressionKeys<T>(fieldPositions, 
this.type);
-               } catch(IllegalArgumentException iae) {
-                       throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
-               }
+               Keys.ExpressionKeys<T> ek = new 
Keys.ExpressionKeys<>(fieldPositions, this.type);
                return ek.computeLogicalKeyPositions();
        }
 
-
-       private int[] computeFlatKeys(String fieldExpression) {
-
-               fieldExpression = fieldExpression.trim();
-
-               if(this.type instanceof CompositeType) {
-                       // compute flat field positions for (nested) sorting 
fields
-                       Keys.ExpressionKeys<T> ek;
-                       try {
-                               ek = new Keys.ExpressionKeys<T>(new 
String[]{fieldExpression}, this.type);
-                       } catch(IllegalArgumentException iae) {
-                               throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
-                       }
-                       return ek.computeLogicalKeyPositions();
-               } else {
-                       fieldExpression = fieldExpression.trim();
-                       if (!(fieldExpression.equals("*") || 
fieldExpression.equals("_"))) {
-                               throw new InvalidProgramException("Data 
properties on non-composite types can only be defined on the full type. " +
-                                               "Use a field wildcard for that 
(\"*\" or \"_\")");
-                       } else {
-                               return new int[]{0};
-                       }
-               }
-       }
-
        /**
         * A custom partitioner to mark compatible split partitionings.
         *
         * @param <T> The type of the partitioned data.
         */
        public static class SourcePartitionerMarker<T> implements 
Partitioner<T> {
+               private static final long serialVersionUID = 
-8554223652384442571L;
 
                String partitionMarker;
 
@@ -454,7 +424,7 @@ public class SplitDataProperties<T> implements 
GenericDataSourceBase.SplitDataPr
                @Override
                public boolean equals(Object o) {
                        if(o instanceof SourcePartitionerMarker) {
-                               return 
this.partitionMarker.equals(((SourcePartitionerMarker) o).partitionMarker);
+                               return 
this.partitionMarker.equals(((SourcePartitionerMarker<?>) o).partitionMarker);
                        } else {
                                return false;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index ca41fc5..845deb4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -35,7 +35,6 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
@@ -633,13 +632,8 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                                 * @see Order
                                 */
                                public CoGroupOperatorWithoutFunction 
sortFirstGroup(int field, Order order) {
-                                       if (!input1.getType().isTupleType()) {
-                                               throw new 
InvalidProgramException("Specifying order keys via field positions is only 
valid for tuple data types");
-                                       }
-                                       if (field >= 
input1.getType().getArity()) {
-                                               throw new 
IllegalArgumentException("Order key out of tuple bounds.");
-                                       }
-                                       ExpressionKeys<I1> ek = new 
ExpressionKeys<>(new int[]{field}, input1.getType());
+
+                                       ExpressionKeys<I1> ek = new 
ExpressionKeys<>(field, input1.getType());
                                        int[] groupOrderKeys = 
ek.computeLogicalKeyPositions();
                                        
                                        for (int key : groupOrderKeys) {
@@ -663,13 +657,8 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                                 * @see Order
                                 */
                                public CoGroupOperatorWithoutFunction 
sortSecondGroup(int field, Order order) {
-                                       if (!input2.getType().isTupleType()) {
-                                               throw new 
InvalidProgramException("Specifying order keys via field positions is only 
valid for tuple data types");
-                                       }
-                                       if (field >= 
input2.getType().getArity()) {
-                                               throw new 
IllegalArgumentException("Order key out of tuple bounds.");
-                                       }
-                                       ExpressionKeys<I2> ek = new 
ExpressionKeys<>(new int[]{field}, input2.getType());
+
+                                       ExpressionKeys<I2> ek = new 
ExpressionKeys<>(field, input2.getType());
                                        int[] groupOrderKeys = 
ek.computeLogicalKeyPositions();
                                        
                                        for (int key : groupOrderKeys) {
@@ -691,10 +680,8 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                                 * @see Order
                                 */
                                public CoGroupOperatorWithoutFunction 
sortFirstGroup(String fieldExpression, Order order) {
-                                       if (! (input1.getType() instanceof 
CompositeType)) {
-                                               throw new 
InvalidProgramException("Specifying order keys via field positions is only 
valid for composite data types (pojo / tuple / case class)");
-                                       }
-                                       ExpressionKeys<I1> ek = new 
ExpressionKeys<>(new String[]{fieldExpression}, input1.getType());
+
+                                       ExpressionKeys<I1> ek = new 
ExpressionKeys<>(fieldExpression, input1.getType());
                                        int[] groupOrderKeys = 
ek.computeLogicalKeyPositions();
                                        
                                        for (int key : groupOrderKeys) {
@@ -716,10 +703,8 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                                 * @see Order
                                 */
                                public CoGroupOperatorWithoutFunction 
sortSecondGroup(String fieldExpression, Order order) {
-                                       if (! (input2.getType() instanceof 
CompositeType)) {
-                                               throw new 
InvalidProgramException("Specifying order keys via field positions is only 
valid for composite data types (pojo / tuple / case class)");
-                                       }
-                                       ExpressionKeys<I2> ek = new 
ExpressionKeys<>(new String[]{fieldExpression}, input2.getType());
+
+                                       ExpressionKeys<I2> ek = new 
ExpressionKeys<>(fieldExpression, input2.getType());
                                        int[] groupOrderKeys = 
ek.computeLogicalKeyPositions();
                                        
                                        for (int key : groupOrderKeys) {

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 3371665..915a053 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -27,8 +27,6 @@ import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.java.DataSet;
 
@@ -109,23 +107,14 @@ public class DataSink<T> {
        @Deprecated
        public DataSink<T> sortLocalOutput(int field, Order order) {
 
-               if (!this.type.isTupleType()) {
-                       throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for tuple data types");
-               }
-               if (field >= this.type.getArity()) {
-                       throw new InvalidProgramException("Order key out of 
tuple bounds.");
-               }
-               isValidSortKeyType(field);
-
                // get flat keys
-               Keys.ExpressionKeys<T> ek;
-               try {
-                       ek = new Keys.ExpressionKeys<>(new int[]{field}, 
this.type);
-               } catch(IllegalArgumentException iae) {
-                       throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
-               }
+               Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(field, 
this.type);
                int[] flatKeys = ek.computeLogicalKeyPositions();
 
+               if (!Keys.ExpressionKeys.isSortKey(field, this.type)) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type");
+               }
+
                if(this.sortKeyPositions == null) {
                        // set sorting info
                        this.sortKeyPositions = flatKeys;
@@ -168,34 +157,18 @@ public class DataSink<T> {
                int[] fields;
                Order[] orders;
 
-               if(this.type instanceof CompositeType) {
+               // compute flat field positions for (nested) sorting fields
+               Keys.ExpressionKeys<T> ek = new 
Keys.ExpressionKeys<>(fieldExpression, this.type);
+               fields = ek.computeLogicalKeyPositions();
 
-                       // compute flat field positions for (nested) sorting 
fields
-                       Keys.ExpressionKeys<T> ek;
-                       try {
-                               isValidSortKeyType(fieldExpression);
-                               ek = new Keys.ExpressionKeys<>(new 
String[]{fieldExpression}, this.type);
-                       } catch(IllegalArgumentException iae) {
-                               throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
-                       }
-                       fields = ek.computeLogicalKeyPositions();
-                       numFields = fields.length;
-                       orders = new Order[numFields];
-                       Arrays.fill(orders, order);
-               } else {
-                       fieldExpression = fieldExpression.trim();
-                       if (!(fieldExpression.equals("*") || 
fieldExpression.equals("_"))) {
-                               throw new InvalidProgramException("Output 
sorting of non-composite types can only be defined on the full type. " +
-                                               "Use a field wildcard for that 
(\"*\" or \"_\")");
-                       } else {
-                               isValidSortKeyType(fieldExpression);
-
-                               numFields = 1;
-                               fields = new int[]{0};
-                               orders = new Order[]{order};
-                       }
+               if (!Keys.ExpressionKeys.isSortKey(fieldExpression, this.type)) 
{
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
 
+               numFields = fields.length;
+               orders = new Order[numFields];
+               Arrays.fill(orders, order);
+
                if(this.sortKeyPositions == null) {
                        // set sorting info
                        this.sortKeyPositions = fields;
@@ -215,28 +188,6 @@ public class DataSink<T> {
                return this;
        }
 
-       private void isValidSortKeyType(int field) {
-               TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) 
this.type).getTypeAt(field);
-               if (!sortKeyType.isSortKeyType()) {
-                       throw new InvalidProgramException("Selected sort key is 
not a sortable type " + sortKeyType);
-               }
-       }
-
-       private void isValidSortKeyType(String field) {
-               TypeInformation<?> sortKeyType;
-
-               field = field.trim();
-               if(field.equals("*") || field.equals("_")) {
-                       sortKeyType = this.type;
-               } else {
-                       sortKeyType = ((CompositeType<?>) 
this.type).getTypeAt(field);
-               }
-
-               if (!sortKeyType.isSortKeyType()) {
-                       throw new InvalidProgramException("Selected sort key is 
not a sortable type " + sortKeyType);
-               }
-       }
-
        /**
         * @return Configuration for the OutputFormat.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index d1d208a..d85c9a6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -50,7 +50,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
 
                // if keys is null distinction is done on all fields
                if (keys == null) {
-                       keys = new Keys.ExpressionKeys<>(new String[] 
{Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
+                       keys = new Keys.ExpressionKeys<>(input.getType());
                }
 
                this.keys = keys;

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 95ca300..5992f0b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -19,8 +19,6 @@
 package org.apache.flink.api.java.operators;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
 
 import com.google.common.base.Joiner;
@@ -31,7 +29,6 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
@@ -43,34 +40,44 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-
 
 public abstract class Keys<T> {
-       private static final Logger LOG = LoggerFactory.getLogger(Keys.class);
 
        public abstract int getNumberOfKeyFields();
 
+       public abstract int[] computeLogicalKeyPositions();
+
+       protected abstract TypeInformation<?>[] getKeyFieldTypes();
+
+       public abstract <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo);
+
        public boolean isEmpty() {
                return getNumberOfKeyFields() == 0;
        }
-       
+
        /**
         * Check if two sets of keys are compatible to each other (matching 
types, key counts)
         */
-       public abstract boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException;
-       
-       public abstract int[] computeLogicalKeyPositions();
-       
-       public abstract <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo);
-       
-       
+       public boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException {
+
+               TypeInformation<?>[] thisKeyFieldTypes = 
this.getKeyFieldTypes();
+               TypeInformation<?>[] otherKeyFieldTypes = 
other.getKeyFieldTypes();
+
+               if (thisKeyFieldTypes.length != otherKeyFieldTypes.length) {
+                       throw new 
IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
+               } else {
+                       for (int i = 0; i < thisKeyFieldTypes.length; i++) {
+                               if 
(!thisKeyFieldTypes[i].equals(otherKeyFieldTypes[i])) {
+                                       throw new 
IncompatibleKeysException(thisKeyFieldTypes[i], otherKeyFieldTypes[i] );
+                               }
+                       }
+               }
+               return true;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Specializations for expression-based / extractor-based grouping
        // 
--------------------------------------------------------------------------------------------
@@ -81,31 +88,30 @@ public abstract class Keys<T> {
                private final KeySelector<T, K> keyExtractor;
                private final TypeInformation<T> inputType;
                private final TypeInformation<K> keyType;
-               private final int[] logicalKeyFields;
+               private final List<FlatFieldDescriptor> keyFields;
 
                public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, 
TypeInformation<T> inputType, TypeInformation<K> keyType) {
+
                        if (keyExtractor == null) {
                                throw new NullPointerException("Key extractor 
must not be null.");
                        }
                        if (keyType == null) {
                                throw new NullPointerException("Key type must 
not be null.");
                        }
+                       if (!keyType.isKeyType()) {
+                               throw new InvalidProgramException("Return type 
"+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key 
type");
+                       }
 
                        this.keyExtractor = keyExtractor;
                        this.inputType = inputType;
                        this.keyType = keyType;
 
-                       if(!keyType.isKeyType()) {
-                               throw new InvalidProgramException("Return type 
"+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key 
type");
+                       if (keyType instanceof CompositeType) {
+                               this.keyFields = 
((CompositeType<T>)keyType).getFlatFields(ExpressionKeys.SELECT_ALL_CHAR);
                        }
-
-                       // we have to handle a special case here:
-                       // if the keyType is a composite type, we need to 
select the full type with all its fields.
-                       if(keyType instanceof CompositeType) {
-                               ExpressionKeys<K> ek = new ExpressionKeys<>(new 
String[]{ExpressionKeys.SELECT_ALL_CHAR}, keyType);
-                               logicalKeyFields = 
ek.computeLogicalKeyPositions();
-                       } else {
-                               logicalKeyFields = new int[] {0};
+                       else {
+                               this.keyFields = new ArrayList<>(1);
+                               this.keyFields.add(new FlatFieldDescriptor(0, 
keyType));
                        }
                }
 
@@ -123,61 +129,36 @@ public abstract class Keys<T> {
 
                @Override
                public int getNumberOfKeyFields() {
-                       return logicalKeyFields.length;
+                       return keyFields.size();
                }
 
                @Override
-               public boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException {
-                       
-                       if (other instanceof SelectorFunctionKeys) {
-                               @SuppressWarnings("unchecked")
-                               SelectorFunctionKeys<?, K> sfk = 
(SelectorFunctionKeys<?, K>) other;
-
-                               return sfk.keyType.equals(this.keyType);
-                       }
-                       else if (other instanceof ExpressionKeys) {
-                               ExpressionKeys<?> expressionKeys = 
(ExpressionKeys<?>) other;
-                               
-                               if(keyType.isTupleType()) {
-                                       // special case again:
-                                       TupleTypeInfoBase<?> tupleKeyType = 
(TupleTypeInfoBase<?>) keyType;
-                                       List<FlatFieldDescriptor> keyTypeFields 
= tupleKeyType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR);
-                                       if(expressionKeys.keyFields.size() != 
keyTypeFields.size()) {
-                                               throw new 
IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
-                                       }
-                                       for(int i=0; i < 
expressionKeys.keyFields.size(); i++) {
-                                               
if(!expressionKeys.keyFields.get(i).getType().equals(keyTypeFields.get(i).getType()))
 {
-                                                       throw new 
IncompatibleKeysException(expressionKeys.keyFields.get(i).getType(), 
keyTypeFields.get(i).getType() );
-                                               }
-                                       }
-                                       return true;
-                               }
-                               if(expressionKeys.getNumberOfKeyFields() != 1) {
-                                       throw new 
IncompatibleKeysException("Key selector functions are only compatible to one 
key");
-                               }
-                               
-                               
if(expressionKeys.keyFields.get(0).getType().equals(this.keyType)) {
-                                       return true;
-                               } else {
-                                       throw new 
IncompatibleKeysException(expressionKeys.keyFields.get(0).getType(), 
this.keyType);
-                               }
-                       } else {
-                               throw new IncompatibleKeysException("The key is 
not compatible with "+other);
+               public int[] computeLogicalKeyPositions() {
+                       int[] logicalKeys = new int[keyFields.size()];
+                       for (int i = 0; i < keyFields.size(); i++) {
+                               logicalKeys[i] = keyFields.get(i).getPosition();
                        }
+                       return logicalKeys;
                }
 
                @Override
-               public int[] computeLogicalKeyPositions() {
-                       return logicalKeyFields;
+               protected TypeInformation<?>[] getKeyFieldTypes() {
+                       TypeInformation<?>[] fieldTypes = new 
TypeInformation[keyFields.size()];
+                       for (int i = 0; i < keyFields.size(); i++) {
+                               fieldTypes[i] = keyFields.get(i).getType();
+                       }
+                       return fieldTypes;
                }
                
                @Override
                public <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo) {
-                       if (logicalKeyFields.length != 1) {
+
+                       if (keyFields.size() != 1) {
                                throw new InvalidProgramException("Custom 
partitioners can only be used with keys that have one key field.");
                        }
                        
                        if (typeInfo == null) {
+                               // try to extract key type from partitioner
                                try {
                                        typeInfo = 
TypeExtractor.getPartitionerTypes(partitioner);
                                }
@@ -185,10 +166,14 @@ public abstract class Keys<T> {
                                        // best effort check, so we ignore 
exceptions
                                }
                        }
-                       
-                       if (typeInfo != null && !(typeInfo instanceof 
GenericTypeInfo) && (!keyType.equals(typeInfo))) {
-                               throw new InvalidProgramException("The 
partitioner is incompatible with the key type. "
+
+                       // only check if type is known and not a generic type
+                       if (typeInfo != null && !(typeInfo instanceof 
GenericTypeInfo)) {
+                               // check equality of key and partitioner type
+                               if (!keyType.equals(typeInfo)) {
+                                       throw new InvalidProgramException("The 
partitioner is incompatible with the key type. "
                                                + "Partitioner type: " + 
typeInfo + " , key type: " + keyType);
+                               }
                        }
                }
 
@@ -281,132 +266,144 @@ public abstract class Keys<T> {
        
        
        /**
-        * Represents (nested) field access through string and integer-based 
keys for Composite Types (Tuple or Pojo)
+        * Represents (nested) field access through string and integer-based 
keys
         */
        public static class ExpressionKeys<T> extends Keys<T> {
                
                public static final String SELECT_ALL_CHAR = "*";
                public static final String SELECT_ALL_CHAR_SCALA = "_";
                
+               // Flattened fields representing keys fields
+               private List<FlatFieldDescriptor> keyFields;
+
                /**
-                * Flattened fields representing keys fields
+                * ExpressionKeys that is defined by the full data type.
                 */
-               private List<FlatFieldDescriptor> keyFields;
-               
+               public ExpressionKeys(TypeInformation<T> type) {
+                       this(SELECT_ALL_CHAR, type);
+               }
+
                /**
-                * two constructors for field-based (tuple-type) keys
+                * Create int-based (non-nested) field position keys on a tuple 
type.
                 */
-               public ExpressionKeys(int[] groupingFields, TypeInformation<T> 
type) {
-                       this(groupingFields, type, false);
+               public ExpressionKeys(int keyPosition, TypeInformation<T> type) 
{
+                       this(new int[]{keyPosition}, type, false);
                }
 
-               // int-defined field
-               public ExpressionKeys(int[] groupingFields, TypeInformation<T> 
type, boolean allowEmpty) {
-                       if (!type.isTupleType()) {
+               /**
+                * Create int-based (non-nested) field position keys on a tuple 
type.
+                */
+               public ExpressionKeys(int[] keyPositions, TypeInformation<T> 
type) {
+                       this(keyPositions, type, false);
+               }
+
+               /**
+                * Create int-based (non-nested) field position keys on a tuple 
type.
+                */
+               public ExpressionKeys(int[] keyPositions, TypeInformation<T> 
type, boolean allowEmpty) {
+
+                       if (!type.isTupleType() || !(type instanceof 
CompositeType)) {
                                throw new InvalidProgramException("Specifying 
keys via field positions is only valid " +
                                                "for tuple data types. Type: " 
+ type);
                        }
                        if (type.getArity() == 0) {
                                throw new InvalidProgramException("Tuple size 
must be greater than 0. Size: " + type.getArity());
                        }
-
-                       if (!allowEmpty && (groupingFields == null || 
groupingFields.length == 0)) {
+                       if (!allowEmpty && (keyPositions == null || 
keyPositions.length == 0)) {
                                throw new IllegalArgumentException("The 
grouping fields must not be empty.");
                        }
-                       // select all fields. Therefore, set all fields on this 
tuple level and let the logic handle the rest
-                       // (makes type assignment easier).
-                       if (groupingFields == null || groupingFields.length == 
0) {
-                               groupingFields = new int[type.getArity()];
-                               for (int i = 0; i < groupingFields.length; i++) 
{
-                                       groupingFields[i] = i;
-                               }
+
+                       this.keyFields = new ArrayList<>();
+
+                       if (keyPositions == null || keyPositions.length == 0) {
+                               // use all tuple fields as key fields
+                               keyPositions = 
createIncrIntArray(type.getArity());
                        } else {
-                               groupingFields = 
rangeCheckFields(groupingFields, type.getArity() -1);
+                               rangeCheckFields(keyPositions, type.getArity() 
- 1);
                        }
-                       Preconditions.checkArgument(groupingFields.length > 0, 
"Grouping fields can not be empty at this point");
-                       
-                       keyFields = new ArrayList<>(type.getTotalFields());
-                       // for each key, find the field:
-                       for (int keyPos : groupingFields) {
-                               int offset = 0;
-                               for (int i = 0; i < type.getArity(); i++) {
-
-                                       TypeInformation<?> fieldType = 
((CompositeType<?>) type).getTypeAt(i);
-                                       if (i < keyPos) {
-                                               // not yet there, increment key 
offset
-                                               offset += 
fieldType.getTotalFields();
-                                       } else {
-                                               // arrived at key position
-                                               if (!fieldType.isKeyType()) {
-                                                       throw new 
InvalidProgramException("This type (" + fieldType + ") cannot be used as key.");
-                                               }
-                                               if (fieldType instanceof 
CompositeType) {
-                                                       // add all nested 
fields of composite type
-                                                       ((CompositeType<?>) 
fieldType).getFlatFields("*", offset, keyFields);
-                                               } else if (fieldType instanceof 
AtomicType) {
-                                                       // add atomic type field
-                                                       keyFields.add(new 
FlatFieldDescriptor(offset, fieldType));
-                                               } else {
-                                                       // type should either 
be composite or atomic
-                                                       throw new 
InvalidProgramException("Field type is neither CompositeType nor AtomicType: " 
+ fieldType);
-                                               }
-                                               // go to next key
-                                               break;
+                       Preconditions.checkArgument(keyPositions.length > 0, 
"Grouping fields can not be empty at this point");
+
+                       // extract key field types
+                       CompositeType<T> cType = (CompositeType<T>)type;
+                       this.keyFields = new ArrayList<>(type.getTotalFields());
+
+                       // for each key position, find all (nested) field types
+                       String[] fieldNames = cType.getFieldNames();
+                       ArrayList<FlatFieldDescriptor> tmpList = new 
ArrayList<>();
+                       for (int keyPos : keyPositions) {
+                               tmpList.clear();
+                               // get all flat fields
+                               cType.getFlatFields(fieldNames[keyPos], 0, 
tmpList);
+                               // check if fields are of key type
+                               for(FlatFieldDescriptor ffd : tmpList) {
+                                       if(!ffd.getType().isKeyType()) {
+                                               throw new 
InvalidProgramException("This type (" + ffd.getType() + ") cannot be used as 
key.");
                                        }
                                }
+                               this.keyFields.addAll(tmpList);
                        }
-                       keyFields = removeNullElementsFromList(keyFields);
                }
 
-               public static <R> List<R> removeNullElementsFromList(List<R> 
in) {
-                       List<R> elements = new ArrayList<>();
-                       for(R e: in) {
-                               if(e != null) {
-                                       elements.add(e);
-                               }
-                       }
-                       return elements;
+               /**
+                * Create String-based (nested) field expression keys on a 
composite type.
+                */
+               public ExpressionKeys(String keyExpression, TypeInformation<T> 
type) {
+                       this(new String[]{keyExpression}, type);
                }
-               
+
                /**
-                * Create ExpressionKeys from String-expressions
+                * Create String-based (nested) field expression keys on a 
composite type.
                 */
-               public ExpressionKeys(String[] expressionsIn, 
TypeInformation<T> type) {
-                       Preconditions.checkNotNull(expressionsIn, "Field 
expression cannot be null.");
+               public ExpressionKeys(String[] keyExpressions, 
TypeInformation<T> type) {
+                       Preconditions.checkNotNull(keyExpressions, "Field 
expression cannot be null.");
 
-                       if (type instanceof AtomicType) {
-                               if (!type.isKeyType()) {
-                                       throw new InvalidProgramException("This 
type (" + type + ") cannot be used as key.");
-                               } else if (expressionsIn.length != 1 || 
!(Keys.ExpressionKeys.SELECT_ALL_CHAR.equals(expressionsIn[0]) || 
Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA.equals(expressionsIn[0]))) {
-                                       throw new 
InvalidProgramException("Field expression for atomic type must be equal to '*' 
or '_'.");
-                               }
+                       this.keyFields = new ArrayList<>(keyExpressions.length);
 
-                               keyFields = new ArrayList<>(1);
-                               keyFields.add(new FlatFieldDescriptor(0, type));
-                       } else {
+                       if (type instanceof CompositeType){
                                CompositeType<T> cType = (CompositeType<T>) 
type;
 
-                               String[] expressions = 
removeDuplicates(expressionsIn);
-                               if(expressionsIn.length != expressions.length) {
-                                       LOG.warn("The key expressions contained 
duplicates. They are now unique");
-                               }
                                // extract the keys on their flat position
-                               keyFields = new ArrayList<>(expressions.length);
-                               for (String expression : expressions) {
-                                       List<FlatFieldDescriptor> keys = 
cType.getFlatFields(expression); // use separate list to do a size check
-                                       for (FlatFieldDescriptor key : keys) {
-                                               TypeInformation<?> keyType = 
key.getType();
-                                               if (!keyType.isKeyType()) {
-                                                       throw new 
InvalidProgramException("This type (" + key.getType() + ") cannot be used as 
key.");
-                                               }
-                                               if (!(keyType instanceof 
AtomicType || keyType instanceof CompositeType)) {
-                                                       throw new 
InvalidProgramException("Field type is neither CompositeType nor AtomicType: " 
+ keyType);
+                               for (String keyExpr : keyExpressions) {
+                                       if (keyExpr == null) {
+                                               throw new 
InvalidProgramException("Expression key may not be null.");
+                                       }
+                                       // strip off whitespace
+                                       keyExpr = keyExpr.trim();
+
+                                       List<FlatFieldDescriptor> flatFields = 
cType.getFlatFields(keyExpr);
+
+                                       if (flatFields.size() == 0) {
+                                               throw new 
InvalidProgramException("Unable to extract key from expression '" + keyExpr + 
"' on key " + cType);
+                                       }
+                                       // check if all nested fields can be 
used as keys
+                                       for (FlatFieldDescriptor field : 
flatFields) {
+                                               if 
(!field.getType().isKeyType()) {
+                                                       throw new 
InvalidProgramException("This type (" + field.getType() + ") cannot be used as 
key.");
                                                }
                                        }
-                                       if (keys.size() == 0) {
-                                               throw new 
InvalidProgramException("Unable to extract key from expression '" + expression 
+ "' on key " + cType);
+                                       // add flat fields to key fields
+                                       keyFields.addAll(flatFields);
+                               }
+                       }
+                       else {
+                               if (!type.isKeyType()) {
+                                       throw new InvalidProgramException("This 
type (" + type + ") cannot be used as key.");
+                               }
+
+                               // check that all key expressions are valid
+                               for (String keyExpr : keyExpressions) {
+                                       if (keyExpr == null) {
+                                               throw new 
InvalidProgramException("Expression key may not be null.");
+                                       }
+                                       // strip off whitespace
+                                       keyExpr = keyExpr.trim();
+                                       // check that full type is addressed
+                                       if (!(SELECT_ALL_CHAR.equals(keyExpr) 
|| SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
+                                               throw new 
InvalidProgramException(
+                                                       "Field expression must 
be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for 
non-composite types.");
                                        }
-                                       keyFields.addAll(keys);
+                                       // add full type as key
+                                       keyFields.add(new 
FlatFieldDescriptor(0, type));
                                }
                        }
                }
@@ -420,43 +417,32 @@ public abstract class Keys<T> {
                }
 
                @Override
-               public boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException {
-
-                       if (other instanceof ExpressionKeys) {
-                               ExpressionKeys<?> oKey = (ExpressionKeys<?>) 
other;
-
-                               if(oKey.getNumberOfKeyFields() != 
this.getNumberOfKeyFields() ) {
-                                       throw new 
IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
-                               }
-                               for(int i=0; i < this.keyFields.size(); i++) {
-                                       
if(!this.keyFields.get(i).getType().equals(oKey.keyFields.get(i).getType())) {
-                                               throw new 
IncompatibleKeysException(this.keyFields.get(i).getType(), 
oKey.keyFields.get(i).getType() );
-                                       }
-                               }
-                               return true;
-                       } else if(other instanceof SelectorFunctionKeys<?, ?>) {
-                               return other.areCompatible(this);
-                       } else {
-                               throw new IncompatibleKeysException("The key is 
not compatible with "+other);
+               public int[] computeLogicalKeyPositions() {
+                       int[] logicalKeys = new int[keyFields.size()];
+                       for (int i = 0; i < keyFields.size(); i++) {
+                               logicalKeys[i] = keyFields.get(i).getPosition();
                        }
+                       return logicalKeys;
                }
 
                @Override
-               public int[] computeLogicalKeyPositions() {
-                       List<Integer> logicalKeys = new ArrayList<>();
-                       for (FlatFieldDescriptor kd : keyFields) {
-                               logicalKeys.add(kd.getPosition());
+               protected TypeInformation<?>[] getKeyFieldTypes() {
+                       TypeInformation<?>[] fieldTypes = new 
TypeInformation[keyFields.size()];
+                       for (int i = 0; i < keyFields.size(); i++) {
+                               fieldTypes[i] = keyFields.get(i).getType();
                        }
-                       return Ints.toArray(logicalKeys);
+                       return fieldTypes;
                }
-               
+
                @Override
                public <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo) {
+
                        if (keyFields.size() != 1) {
                                throw new InvalidProgramException("Custom 
partitioners can only be used with keys that have one key field.");
                        }
-                       
+
                        if (typeInfo == null) {
+                               // try to extract key type from partitioner
                                try {
                                        typeInfo = 
TypeExtractor.getPartitionerTypes(partitioner);
                                }
@@ -464,8 +450,10 @@ public abstract class Keys<T> {
                                        // best effort check, so we ignore 
exceptions
                                }
                        }
-                       
+
                        if (typeInfo != null && !(typeInfo instanceof 
GenericTypeInfo)) {
+                               // only check type compatibility if type is 
known and not a generic type
+
                                TypeInformation<?> keyType = 
keyFields.get(0).getType();
                                if (!keyType.equals(typeInfo)) {
                                        throw new InvalidProgramException("The 
partitioner is incompatible with the key type. "
@@ -479,51 +467,71 @@ public abstract class Keys<T> {
                        Joiner join = Joiner.on('.');
                        return "ExpressionKeys: " + join.join(keyFields);
                }
-       }
-       
-       private static String[] removeDuplicates(String[] in) {
-               List<String> ret = new LinkedList<>();
-               for(String el : in) {
-                       if(!ret.contains(el)) {
-                               ret.add(el);
+
+               public static boolean isSortKey(int fieldPos, 
TypeInformation<?> type) {
+
+                       if (!type.isTupleType() || !(type instanceof 
CompositeType)) {
+                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid " +
+                                       "for tuple data types. Type: " + type);
+                       }
+                       if (type.getArity() == 0) {
+                               throw new InvalidProgramException("Tuple size 
must be greater than 0. Size: " + type.getArity());
+                       }
+
+                       if(fieldPos < 0 || fieldPos >= type.getArity()) {
+                               throw new IndexOutOfBoundsException("Tuple 
position is out of range: " + fieldPos);
+                       }
+
+                       TypeInformation<?> sortKeyType = 
((CompositeType<?>)type).getTypeAt(fieldPos);
+                       return sortKeyType.isSortKeyType();
+               }
+
+               public static boolean isSortKey(String fieldExpr, 
TypeInformation<?> type) {
+
+                       TypeInformation<?> sortKeyType;
+
+                       fieldExpr = fieldExpr.trim();
+                       if (SELECT_ALL_CHAR.equals(fieldExpr) || 
SELECT_ALL_CHAR_SCALA.equals(fieldExpr)) {
+                               sortKeyType = type;
+                       }
+                       else {
+                               if (type instanceof CompositeType) {
+                                       sortKeyType = ((CompositeType<?>) 
type).getTypeAt(fieldExpr);
+                               }
+                               else {
+                                       throw new InvalidProgramException(
+                                               "Field expression must be equal 
to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for atomic 
types.");
+                               }
                        }
+
+                       return sortKeyType.isSortKeyType();
                }
-               return ret.toArray(new String[ret.size()]);
+
        }
+
        // 
--------------------------------------------------------------------------------------------
-       
-       
+
+
+
        // 
--------------------------------------------------------------------------------------------
        //  Utilities
        // 
--------------------------------------------------------------------------------------------
 
 
-       private static int[] rangeCheckFields(int[] fields, int 
maxAllowedField) {
-
-               // range check and duplicate eliminate
-               int i = 1, k = 0;
-               int last = fields[0];
-
-               if (last < 0 || last > maxAllowedField) {
-                       throw new IllegalArgumentException("Tuple position is 
out of range: " + last);
+       private static int[] createIncrIntArray(int numKeys) {
+               int[] keyFields = new int[numKeys];
+               for (int i = 0; i < numKeys; i++) {
+                       keyFields[i] = i;
                }
+               return keyFields;
+       }
 
-               for (; i < fields.length; i++) {
-                       if (fields[i] < 0 || fields[i] > maxAllowedField) {
-                               throw new IllegalArgumentException("Tuple 
position is out of range.");
-                       }
-                       if (fields[i] != last) {
-                               k++;
-                               last = fields[i];
-                               fields[k] = fields[i];
-                       }
-               }
+       private static void rangeCheckFields(int[] fields, int maxAllowedField) 
{
 
-               // check if we eliminated something
-               if (k == fields.length - 1) {
-                       return fields;
-               } else {
-                       return Arrays.copyOfRange(fields, 0, k+1);
+               for (int f : fields) {
+                       if (f < 0 || f > maxAllowedField) {
+                               throw new IndexOutOfBoundsException("Tuple 
position is out of range: " + f);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index c3d46f2..1384ca2 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -71,10 +70,6 @@ public class PartitionOperator<T> extends 
SingleInputOperator<T, T, PartitionOpe
                Preconditions.checkArgument(pKeys != null || pMethod == 
PartitionMethod.REBALANCE, "Partitioning requires keys");
                Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM 
|| customPartitioner != null, "Custom partioning requires a partitioner.");
 
-               if (pKeys instanceof Keys.ExpressionKeys<?> && 
!(input.getType() instanceof CompositeType) ) {
-                       throw new IllegalArgumentException("Hash Partitioning 
with key fields only possible on Tuple or POJO DataSets");
-               }
-               
                if (customPartitioner != null) {
                        pKeys.validateCustomPartitioner(customPartitioner, 
partitionerTypeInfo);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
index 3fce01b..d65bc68 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
@@ -24,10 +24,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 
 import java.util.Arrays;
 
@@ -97,61 +94,22 @@ public class SortPartitionOperator<T> extends 
SingleInputOperator<T, T, SortPart
 
        private int[] getFlatFields(int field) {
 
-               if(!(super.getType() instanceof TupleTypeInfoBase<?>)) {
-                       throw new InvalidProgramException("Field positions can 
only be specified on Tuple or " +
-                                       "Case Class types.");
-               }
-               else {
-                       // check selected field is sortable type
-                       TypeInformation<?> sortKeyType = 
((TupleTypeInfoBase<?>) super.getType()).getTypeAt(field);
-                       if (!sortKeyType.isSortKeyType()) {
-                               throw new InvalidProgramException("Selected 
sort key is not a sortable type " + sortKeyType);
-                       }
+               if (!Keys.ExpressionKeys.isSortKey(field, super.getType())) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
 
-               Keys.ExpressionKeys<T> ek;
-               try {
-                       ek = new Keys.ExpressionKeys<T>(new int[]{field}, 
super.getType());
-               } catch(IllegalArgumentException iae) {
-                       throw new InvalidProgramException("Invalid 
specification of field position.", iae);
-               }
+               Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(field, 
super.getType());
                return ek.computeLogicalKeyPositions();
        }
 
        private int[] getFlatFields(String fields) {
 
-               if(super.getType() instanceof CompositeType) {
-
-                       // check selected field is sortable type
-                       TypeInformation<?> sortKeyType = ((CompositeType<?>) 
super.getType()).getTypeAt(fields);
-                       if (!sortKeyType.isSortKeyType()) {
-                               throw new InvalidProgramException("Selected 
sort key is not a sortable type " + sortKeyType);
-                       }
-
-                       // compute flat field positions for (nested) sorting 
fields
-                       Keys.ExpressionKeys<T> ek;
-                       try {
-                               ek = new Keys.ExpressionKeys<T>(new 
String[]{fields}, super.getType());
-                       } catch(IllegalArgumentException iae) {
-                               throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
-                       }
-                       return ek.computeLogicalKeyPositions();
-               } else {
-
-                       fields = fields.trim();
-                       if (!(fields.equals("*") || fields.equals("_"))) {
-                               throw new InvalidProgramException("Output 
sorting of non-composite types can only be defined on the full type. " +
-                                               "Use a field wildcard for that 
(\"*\" or \"_\")");
-                       } else {
-
-                               // check if selected field is sortable type
-                               if (!super.getType().isSortKeyType()) {
-                                       throw new 
InvalidProgramException("Selected sort key cannot be sorted: " + 
super.getType());
-                               }
-
-                               return new int[]{0};
-                       }
+               if (!Keys.ExpressionKeys.isSortKey(fields, super.getType())) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
+
+               Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(fields, 
super.getType());
+               return ek.computeLogicalKeyPositions();
        }
 
        private void appendSorting(int[] flatOrderFields, Order order) {
@@ -189,8 +147,8 @@ public class SortPartitionOperator<T> extends 
SingleInputOperator<T, T, SortPart
                }
 
                // distinguish between partition types
-               UnaryOperatorInformation<T, T> operatorInfo = new 
UnaryOperatorInformation<T, T>(getType(), getType());
-               SortPartitionOperatorBase<T> noop = new  
SortPartitionOperatorBase<T>(operatorInfo, partitionOrdering, name);
+               UnaryOperatorInformation<T, T> operatorInfo = new 
UnaryOperatorInformation<>(getType(), getType());
+               SortPartitionOperatorBase<T> noop = new  
SortPartitionOperatorBase<>(operatorInfo, partitionOrdering, name);
                noop.setInput(input);
                if(this.getParallelism() < 0) {
                        // use parallelism of input if not explicitly specified

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 6092d14..f626e65 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.FirstReducer;
 
@@ -33,7 +32,6 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import com.google.common.base.Preconditions;
@@ -58,17 +56,14 @@ public class SortedGrouping<T> extends Grouping<T> {
         */
        public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order 
order) {
                super(set, keys);
-               
-               if (!dataSet.getType().isTupleType()) {
-                       throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for tuple data types");
-               }
-               if (field >= dataSet.getType().getArity()) {
-                       throw new IllegalArgumentException("Order key out of 
tuple bounds.");
+
+               if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
-               isValidSortKeyType(field);
 
                // use int-based expression key to properly resolve nested 
tuples for grouping
-               ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, 
dataSet.getType());
+               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
dataSet.getType());
+
                this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
                this.groupSortOrders = new Order[groupSortKeyPositions.length];
                Arrays.fill(this.groupSortOrders, order);
@@ -79,14 +74,14 @@ public class SortedGrouping<T> extends Grouping<T> {
         */
        public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order 
order) {
                super(set, keys);
-               
-               if (!(dataSet.getType() instanceof CompositeType)) {
-                       throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for composite data types (pojo / tuple / 
case class)");
+
+               if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
-               isValidSortKeyType(field);
 
                // resolve String-field to int using the expression keys
-               ExpressionKeys<T> ek = new ExpressionKeys<T>(new 
String[]{field}, dataSet.getType());
+               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
dataSet.getType());
+
                this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
                this.groupSortOrders = new Order[groupSortKeyPositions.length];
                Arrays.fill(this.groupSortOrders, order); // if field == "*"
@@ -174,7 +169,7 @@ public class SortedGrouping<T> extends Grouping<T> {
                }
                TypeInformation<R> resultType = 
TypeExtractor.getGroupReduceReturnTypes(reducer,
                                this.getDataSet().getType(), 
Utils.getCallLocationName(), true);
-               return new GroupReduceOperator<T, R>(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
+               return new GroupReduceOperator<>(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
        }
 
        /**
@@ -196,7 +191,7 @@ public class SortedGrouping<T> extends Grouping<T> {
                TypeInformation<R> resultType = 
TypeExtractor.getGroupCombineReturnTypes(combiner,
                                this.getDataSet().getType(), 
Utils.getCallLocationName(), true);
 
-               return new GroupCombineOperator<T, R>(this, resultType, 
dataSet.clean(combiner), Utils.getCallLocationName());
+               return new GroupCombineOperator<>(this, resultType, 
dataSet.clean(combiner), Utils.getCallLocationName());
        }
 
        
@@ -233,15 +228,12 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (groupSortSelectorFunctionKey != null) {
                        throw new InvalidProgramException("Chaining sortGroup 
with KeySelector sorting is not supported");
                }
-               if (!dataSet.getType().isTupleType()) {
-                       throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for tuple data types");
-               }
-               if (field >= dataSet.getType().getArity()) {
-                       throw new IllegalArgumentException("Order key out of 
tuple bounds.");
+               if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
-               isValidSortKeyType(field);
 
-               ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, 
dataSet.getType());
+               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
dataSet.getType());
+
                addSortGroupInternal(ek, order);
                return this;
        }
@@ -262,12 +254,12 @@ public class SortedGrouping<T> extends Grouping<T> {
                if (groupSortSelectorFunctionKey != null) {
                        throw new InvalidProgramException("Chaining sortGroup 
with KeySelector sorting is not supported");
                }
-               if (! (dataSet.getType() instanceof CompositeType)) {
-                       throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for composite data types (pojo / tuple / 
case class)");
+               if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
+                       throw new InvalidProgramException("Selected sort key is 
not a sortable type");
                }
-               isValidSortKeyType(field);
 
-               ExpressionKeys<T> ek = new ExpressionKeys<T>(new 
String[]{field}, dataSet.getType());
+               ExpressionKeys<T> ek = new ExpressionKeys<>(field, 
dataSet.getType());
+
                addSortGroupInternal(ek, order);
                return this;
        }
@@ -287,26 +279,4 @@ public class SortedGrouping<T> extends Grouping<T> {
                }
        }
 
-       private void isValidSortKeyType(int field) {
-               TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) 
dataSet.getType()).getTypeAt(field);
-               if (!sortKeyType.isSortKeyType()) {
-                       throw new InvalidProgramException("Selected sort key is 
not a sortable type " + sortKeyType);
-               }
-       }
-
-       private void isValidSortKeyType(String field) {
-               TypeInformation<?> sortKeyType;
-
-               field = field.trim();
-               if(field.equals("*") || field.equals("_")) {
-                       sortKeyType = this.getDataSet().getType();
-               } else {
-                       sortKeyType = ((CompositeType<?>) 
this.getDataSet().getType()).getTypeAt(field);
-               }
-
-               if (!sortKeyType.isSortKeyType()) {
-                       throw new InvalidProgramException("Selected sort key is 
not a sortable type " + sortKeyType);
-               }
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index f32f6a9..90a65e6 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -101,7 +101,7 @@ public class CoGroupOperatorTest {
                ds1.coGroup(ds2).where(0,1).equalTo(2);
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testCoGroupKeyFields4() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -112,7 +112,7 @@ public class CoGroupOperatorTest {
                ds1.coGroup(ds2).where(5).equalTo(0);
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testCoGroupKeyFields5() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -123,7 +123,7 @@ public class CoGroupOperatorTest {
                ds1.coGroup(ds2).where(-1).equalTo(-1);
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = InvalidProgramException.class)
        public void testCoGroupKeyFields6() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -131,7 +131,7 @@ public class CoGroupOperatorTest {
                DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
                // should not work, cogroup key fields on custom type
-               ds1.coGroup(ds2).where(5).equalTo(0);
+               ds1.coGroup(ds2).where(4).equalTo(0);
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
index 5024a0e..0493583 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -63,7 +63,7 @@ public class DataSinkTest {
 
                // should work
                try {
-                       tupleDs.sortPartition(0, 
Order.ANY).writeAsText("/tmp/willNotHappen");
+                       
tupleDs.writeAsText("/tmp/willNotHappen").sortLocalOutput(0, Order.ANY);
                } catch (Exception e) {
                        Assert.fail();
                }
@@ -79,9 +79,9 @@ public class DataSinkTest {
 
                // should work
                try {
-                       tupleDs.sortPartition(0, Order.ASCENDING)
-                                       .sortPartition(3, Order.DESCENDING)
-                                       .writeAsText("/tmp/willNotHappen");
+                       tupleDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput(0, Order.ASCENDING)
+                               .sortLocalOutput(3, Order.DESCENDING);
                } catch (Exception e) {
                        Assert.fail();
                }
@@ -97,13 +97,13 @@ public class DataSinkTest {
 
                // should work
                try {
-                       tupleDs.sortPartition("f0", 
Order.ANY).writeAsText("/tmp/willNotHappen");
+                       tupleDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput("f0", Order.ANY);
                } catch (Exception e) {
                        Assert.fail();
                }
        }
 
-       @Test(expected = CompositeType.InvalidFieldReferenceException.class)
        public void testTupleSingleOrderExpFull() {
 
                final ExecutionEnvironment env = ExecutionEnvironment
@@ -112,7 +112,8 @@ public class DataSinkTest {
                                .fromCollection(emptyTupleData, tupleTypeInfo);
 
                // should not work
-               tupleDs.sortPartition("*", 
Order.ANY).writeAsText("/tmp/willNotHappen");
+               tupleDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput("*", Order.ANY);
        }
 
        @Test
@@ -125,9 +126,9 @@ public class DataSinkTest {
 
                // should work
                try {
-                       tupleDs.sortPartition("f1", Order.ASCENDING)
-                                       .sortPartition("f4", Order.DESCENDING)
-                                       .writeAsText("/tmp/willNotHappen");
+                       tupleDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput("f1", Order.ASCENDING)
+                               .sortLocalOutput("f4", Order.DESCENDING);
                } catch (Exception e) {
                        Assert.fail();
                }
@@ -143,9 +144,9 @@ public class DataSinkTest {
 
                // should work
                try {
-                       tupleDs.sortPartition(4, Order.ASCENDING)
-                                       .sortPartition("f2", Order.DESCENDING)
-                                       .writeAsText("/tmp/willNotHappen");
+                       tupleDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput(4, Order.ASCENDING)
+                               .sortLocalOutput("f2", Order.DESCENDING);
                } catch (Exception e) {
                        Assert.fail();
                }
@@ -160,9 +161,9 @@ public class DataSinkTest {
                                .fromCollection(emptyTupleData, tupleTypeInfo);
 
                // must not work
-               tupleDs.sortPartition(3, Order.ASCENDING)
-                               .sortPartition(5, Order.DESCENDING)
-                               .writeAsText("/tmp/willNotHappen");
+               tupleDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput(3, Order.ASCENDING)
+                       .sortLocalOutput(5, Order.DESCENDING);
        }
 
        @Test(expected = CompositeType.InvalidFieldReferenceException.class)
@@ -174,9 +175,9 @@ public class DataSinkTest {
                                .fromCollection(emptyTupleData, tupleTypeInfo);
 
                // must not work
-               tupleDs.sortPartition("notThere", Order.ASCENDING)
-                               .sortPartition("f4", Order.DESCENDING)
-                               .writeAsText("/tmp/willNotHappen");
+               tupleDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput("notThere", Order.ASCENDING)
+                       .sortLocalOutput("f4", Order.DESCENDING);
        }
 
        @Test
@@ -189,7 +190,8 @@ public class DataSinkTest {
 
                // should work
                try {
-                       longDs.sortPartition("*", 
Order.ASCENDING).writeAsText("/tmp/willNotHappen");
+                       longDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput("*", Order.ASCENDING);
                } catch (Exception e) {
                        Assert.fail();
                }
@@ -204,7 +206,8 @@ public class DataSinkTest {
                                .generateSequence(0,2);
 
                // must not work
-               longDs.sortPartition(0, 
Order.ASCENDING).writeAsText("/tmp/willNotHappen");
+               longDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput(0, Order.ASCENDING);
        }
 
        @Test(expected = InvalidProgramException.class)
@@ -216,7 +219,8 @@ public class DataSinkTest {
                                .generateSequence(0,2);
 
                // must not work
-               longDs.sortPartition("0", 
Order.ASCENDING).writeAsText("/tmp/willNotHappen");
+               longDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput("0", Order.ASCENDING);
        }
 
        @Test(expected = InvalidProgramException.class)
@@ -228,7 +232,8 @@ public class DataSinkTest {
                                .generateSequence(0,2);
 
                // must not work
-               longDs.sortPartition("nope", 
Order.ASCENDING).writeAsText("/tmp/willNotHappen");
+               longDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput("nope", Order.ASCENDING);
        }
 
        @Test
@@ -241,7 +246,8 @@ public class DataSinkTest {
 
                // should work
                try {
-                       pojoDs.sortPartition("myString", 
Order.ASCENDING).writeAsText("/tmp/willNotHappen");
+                       pojoDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput("myString", Order.ASCENDING);
                } catch (Exception e) {
                        Assert.fail();
                }
@@ -257,9 +263,9 @@ public class DataSinkTest {
 
                // should work
                try {
-                       pojoDs.sortPartition("myLong", Order.ASCENDING)
-                                       .sortPartition("myString", 
Order.DESCENDING)
-                                       .writeAsText("/tmp/willNotHappen");
+                       pojoDs.writeAsText("/tmp/willNotHappen")
+                               .sortLocalOutput("myLong", Order.ASCENDING)
+                               .sortLocalOutput("myString", Order.DESCENDING);
                } catch (Exception e) {
                        Assert.fail();
                }
@@ -274,7 +280,8 @@ public class DataSinkTest {
                                .fromCollection(pojoData);
 
                // must not work
-               pojoDs.sortPartition(1, 
Order.DESCENDING).writeAsText("/tmp/willNotHappen");
+               pojoDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput(1, Order.DESCENDING);
        }
 
        @Test(expected = CompositeType.InvalidFieldReferenceException.class)
@@ -286,12 +293,12 @@ public class DataSinkTest {
                                .fromCollection(pojoData);
 
                // must not work
-               pojoDs.sortPartition("myInt", Order.ASCENDING)
-                               .sortPartition("notThere", Order.DESCENDING)
-                               .writeAsText("/tmp/willNotHappen");
+               pojoDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput("myInt", Order.ASCENDING)
+                       .sortLocalOutput("notThere", Order.DESCENDING);
        }
 
-       @Test(expected = CompositeType.InvalidFieldReferenceException.class)
+       @Test(expected = InvalidProgramException.class)
        public void testPojoSingleOrderFull() {
 
                final ExecutionEnvironment env = ExecutionEnvironment
@@ -300,8 +307,8 @@ public class DataSinkTest {
                                .fromCollection(pojoData);
 
                // must not work
-               pojoDs.sortPartition("*", Order.ASCENDING)
-                               .writeAsText("/tmp/willNotHappen");
+               pojoDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput("*", Order.ASCENDING);
        }
 
        @Test(expected = InvalidProgramException.class)
@@ -316,8 +323,8 @@ public class DataSinkTest {
                                .fromCollection(arrayData);
 
                // must not work
-               pojoDs.sortPartition("*", Order.ASCENDING)
-                               .writeAsText("/tmp/willNotHappen");
+               pojoDs.writeAsText("/tmp/willNotHappen")
+                       .sortLocalOutput("*", Order.ASCENDING);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index f4bd945..2e9bdf7 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -112,7 +112,7 @@ public class DistinctOperatorTest {
                customDs.distinct();
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testDistinctByKeyFields6() {
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
index fe362f0..794f0d4 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
@@ -123,7 +123,7 @@ public class FullOuterJoinOperatorTest {
                                .with(new DummyJoin());
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testFullOuter7() {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = 
env.fromCollection(emptyTupleData, tupleTypeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index bdad3db..9220095 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -107,7 +107,7 @@ public class GroupingTest {
                
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testGroupByKeyFields4() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -117,7 +117,7 @@ public class GroupingTest {
                tupleDs.groupBy(5);
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testGroupByKeyFields5() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -334,7 +334,7 @@ public class GroupingTest {
                }
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testGroupSortKeyFields2() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index be964cc..ae23382 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -137,7 +137,7 @@ public class JoinOperatorTest {
                ds1.join(ds2).where(0,1).equalTo(2);
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testJoinKeyFields4() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -148,7 +148,7 @@ public class JoinOperatorTest {
                ds1.join(ds2).where(5).equalTo(0);
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testJoinKeyFields5() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -159,7 +159,7 @@ public class JoinOperatorTest {
                ds1.join(ds2).where(-1).equalTo(-1);
        }
        
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = InvalidProgramException.class)
        public void testJoinKeyFields6() {
                
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -167,7 +167,7 @@ public class JoinOperatorTest {
                DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
                // should not work, join key fields on custom type
-               ds1.join(ds2).where(5).equalTo(0);
+               ds1.join(ds2).where(4).equalTo(0);
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
index 06b0c13..cab06c2 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
@@ -124,7 +124,7 @@ public class LeftOuterJoinOperatorTest {
                                .with(new DummyJoin());
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testLeftOuter7() {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = 
env.fromCollection(emptyTupleData, tupleTypeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
index 0e407ca..411edd5 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
@@ -123,7 +123,7 @@ public class RightOuterJoinOperatorTest {
                                .with(new DummyJoin());
        }
 
-       @Test(expected = IllegalArgumentException.class)
+       @Test(expected = IndexOutOfBoundsException.class)
        public void testRightOuter7() {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = 
env.fromCollection(emptyTupleData, tupleTypeInfo);

Reply via email to