http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 0c0b710..c02d365 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
@@ -42,9 +43,9 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import 
org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase;
 import org.apache.flink.api.java.operators.join.JoinType;
 import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
@@ -394,8 +395,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
 
                                @SuppressWarnings("unchecked")
                                SelectorFunctionKeys<I1, K> keys1 = 
(SelectorFunctionKeys<I1, K>)rawKeys1;
-                               TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 
= SelectorFunctionKeys.createTypeWithKey(keys1);
-                               Operator<Tuple2<K, I1>> keyMapper1 = 
SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
+                               TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 
= KeyFunctions.createTypeWithKey(keys1);
+                               Operator<Tuple2<K, I1>> keyMapper1 = 
KeyFunctions.appendKeyExtractor(input1, keys1);
 
                                return this.withInput1(keyMapper1, 
typeInfoWithKey1, rawKeys1);
                        }
@@ -406,8 +407,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
 
                                @SuppressWarnings("unchecked")
                                SelectorFunctionKeys<I2, K> keys2 = 
(SelectorFunctionKeys<I2, K>)rawKeys2;
-                               TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 
= SelectorFunctionKeys.createTypeWithKey(keys2);
-                               Operator<Tuple2<K, I2>> keyMapper2 = 
SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
+                               TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 
= KeyFunctions.createTypeWithKey(keys2);
+                               Operator<Tuple2<K, I2>> keyMapper2 = 
KeyFunctions.appendKeyExtractor(input2, keys2);
 
                                return withInput2(keyMapper2, typeInfoWithKey2, 
rawKeys2);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
new file mode 100644
index 0000000..49d598a
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+/**
+ * This class holds static utilities to append functions that extract and
+ * prune keys.
+ */
+public class KeyFunctions {
+
+       @SuppressWarnings("unchecked")
+       public static <T, K> 
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> appendKeyExtractor(
+                       org.apache.flink.api.common.operators.Operator<T> input,
+                       SelectorFunctionKeys<T, K> key)
+       {
+
+               TypeInformation<T> inputType = key.getInputType();
+               TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
createTypeWithKey(key);
+               KeyExtractingMapper<T, K> extractor = new 
KeyExtractingMapper(key.getKeyExtractor());
+
+               MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> 
mapper =
+                               new MapOperatorBase<T, Tuple2<K, T>, 
MapFunction<T, Tuple2<K, T>>>(
+                                               extractor,
+                                               new 
UnaryOperatorInformation(inputType, typeInfoWithKey),
+                                               "Key Extractor"
+                               );
+
+               mapper.setInput(input);
+               mapper.setParallelism(input.getParallelism());
+
+               return mapper;
+       }
+
+       @SuppressWarnings("unchecked")
+       public static <T, K1, K2> 
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> 
appendKeyExtractor(
+                       org.apache.flink.api.common.operators.Operator<T> input,
+                       SelectorFunctionKeys<T, K1> key1,
+                       SelectorFunctionKeys<T, K2> key2)
+       {
+
+               TypeInformation<T> inputType = key1.getInputType();
+               TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = 
createTypeWithKey(key1, key2);
+               TwoKeyExtractingMapper<T, K1, K2> extractor =
+                               new 
TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor());
+
+               MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, 
K2, T>>> mapper =
+                               new MapOperatorBase<T, Tuple3<K1, K2, T>, 
MapFunction<T, Tuple3<K1, K2, T>>>(
+                                               extractor,
+                                               new 
UnaryOperatorInformation<>(inputType, typeInfoWithKey),
+                                               "Key Extractor"
+                               );
+
+               mapper.setInput(input);
+               mapper.setParallelism(input.getParallelism());
+
+               return mapper;
+       }
+
+       public static <T, K> 
org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> 
appendKeyRemover(
+                       
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> inputWithKey,
+                       SelectorFunctionKeys<T, K> key)
+       {
+
+               TypeInformation<T> inputType = key.getInputType();
+               TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
createTypeWithKey(key);
+
+               MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> 
mapper =
+                               new MapOperatorBase<Tuple2<K, T>, T, 
MapFunction<Tuple2<K, T>, T>>(
+                                               new KeyRemovingMapper<T, K>(),
+                                               new 
UnaryOperatorInformation<>(typeInfoWithKey, inputType),
+                                               "Key Remover"
+                               );
+               mapper.setInput(inputWithKey);
+               mapper.setParallelism(inputWithKey.getParallelism());
+
+               return mapper;
+       }
+
+       public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey(
+                       SelectorFunctionKeys<T, K> key)
+       {
+               return new TupleTypeInfo<>(key.getKeyType(), 
key.getInputType());
+       }
+
+       public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> 
createTypeWithKey(
+                       SelectorFunctionKeys<T, K1> key1,
+                       SelectorFunctionKeys<T, K2> key2)
+       {
+               return new TupleTypeInfo<>(key1.getKeyType(), 
key2.getKeyType(), key1.getInputType());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
deleted file mode 100644
index 5992f0b..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ /dev/null
@@ -1,550 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
-import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import com.google.common.base.Preconditions;
-
-public abstract class Keys<T> {
-
-       public abstract int getNumberOfKeyFields();
-
-       public abstract int[] computeLogicalKeyPositions();
-
-       protected abstract TypeInformation<?>[] getKeyFieldTypes();
-
-       public abstract <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo);
-
-       public boolean isEmpty() {
-               return getNumberOfKeyFields() == 0;
-       }
-
-       /**
-        * Check if two sets of keys are compatible to each other (matching 
types, key counts)
-        */
-       public boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException {
-
-               TypeInformation<?>[] thisKeyFieldTypes = 
this.getKeyFieldTypes();
-               TypeInformation<?>[] otherKeyFieldTypes = 
other.getKeyFieldTypes();
-
-               if (thisKeyFieldTypes.length != otherKeyFieldTypes.length) {
-                       throw new 
IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
-               } else {
-                       for (int i = 0; i < thisKeyFieldTypes.length; i++) {
-                               if 
(!thisKeyFieldTypes[i].equals(otherKeyFieldTypes[i])) {
-                                       throw new 
IncompatibleKeysException(thisKeyFieldTypes[i], otherKeyFieldTypes[i] );
-                               }
-                       }
-               }
-               return true;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Specializations for expression-based / extractor-based grouping
-       // 
--------------------------------------------------------------------------------------------
-       
-       
-       public static class SelectorFunctionKeys<T, K> extends Keys<T> {
-
-               private final KeySelector<T, K> keyExtractor;
-               private final TypeInformation<T> inputType;
-               private final TypeInformation<K> keyType;
-               private final List<FlatFieldDescriptor> keyFields;
-
-               public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, 
TypeInformation<T> inputType, TypeInformation<K> keyType) {
-
-                       if (keyExtractor == null) {
-                               throw new NullPointerException("Key extractor 
must not be null.");
-                       }
-                       if (keyType == null) {
-                               throw new NullPointerException("Key type must 
not be null.");
-                       }
-                       if (!keyType.isKeyType()) {
-                               throw new InvalidProgramException("Return type 
"+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key 
type");
-                       }
-
-                       this.keyExtractor = keyExtractor;
-                       this.inputType = inputType;
-                       this.keyType = keyType;
-
-                       if (keyType instanceof CompositeType) {
-                               this.keyFields = 
((CompositeType<T>)keyType).getFlatFields(ExpressionKeys.SELECT_ALL_CHAR);
-                       }
-                       else {
-                               this.keyFields = new ArrayList<>(1);
-                               this.keyFields.add(new FlatFieldDescriptor(0, 
keyType));
-                       }
-               }
-
-               public TypeInformation<K> getKeyType() {
-                       return keyType;
-               }
-
-               public TypeInformation<T> getInputType() {
-                       return inputType;
-               }
-
-               public KeySelector<T, K> getKeyExtractor() {
-                       return keyExtractor;
-               }
-
-               @Override
-               public int getNumberOfKeyFields() {
-                       return keyFields.size();
-               }
-
-               @Override
-               public int[] computeLogicalKeyPositions() {
-                       int[] logicalKeys = new int[keyFields.size()];
-                       for (int i = 0; i < keyFields.size(); i++) {
-                               logicalKeys[i] = keyFields.get(i).getPosition();
-                       }
-                       return logicalKeys;
-               }
-
-               @Override
-               protected TypeInformation<?>[] getKeyFieldTypes() {
-                       TypeInformation<?>[] fieldTypes = new 
TypeInformation[keyFields.size()];
-                       for (int i = 0; i < keyFields.size(); i++) {
-                               fieldTypes[i] = keyFields.get(i).getType();
-                       }
-                       return fieldTypes;
-               }
-               
-               @Override
-               public <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo) {
-
-                       if (keyFields.size() != 1) {
-                               throw new InvalidProgramException("Custom 
partitioners can only be used with keys that have one key field.");
-                       }
-                       
-                       if (typeInfo == null) {
-                               // try to extract key type from partitioner
-                               try {
-                                       typeInfo = 
TypeExtractor.getPartitionerTypes(partitioner);
-                               }
-                               catch (Throwable t) {
-                                       // best effort check, so we ignore 
exceptions
-                               }
-                       }
-
-                       // only check if type is known and not a generic type
-                       if (typeInfo != null && !(typeInfo instanceof 
GenericTypeInfo)) {
-                               // check equality of key and partitioner type
-                               if (!keyType.equals(typeInfo)) {
-                                       throw new InvalidProgramException("The 
partitioner is incompatible with the key type. "
-                                               + "Partitioner type: " + 
typeInfo + " , key type: " + keyType);
-                               }
-                       }
-               }
-
-               @SuppressWarnings("unchecked")
-               public static <T, K> Operator<Tuple2<K, T>> appendKeyExtractor(
-                       Operator<T> input,
-                       SelectorFunctionKeys<T, K> key)
-               {
-
-                       TypeInformation<T> inputType = key.getInputType();
-                       TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
createTypeWithKey(key);
-                       KeyExtractingMapper<T, K> extractor = new 
KeyExtractingMapper(key.getKeyExtractor());
-
-                       MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, 
Tuple2<K, T>>> mapper =
-                               new MapOperatorBase<T, Tuple2<K, T>, 
MapFunction<T, Tuple2<K, T>>>(
-                                       extractor,
-                                       new UnaryOperatorInformation(inputType, 
typeInfoWithKey),
-                                       "Key Extractor"
-                               );
-
-                       mapper.setInput(input);
-                       mapper.setParallelism(input.getParallelism());
-
-                       return mapper;
-               }
-
-               @SuppressWarnings("unchecked")
-               public static <T, K1, K2> Operator<Tuple3<K1, K2, T>> 
appendKeyExtractor(
-                       Operator<T> input,
-                       SelectorFunctionKeys<T, K1> key1,
-                       SelectorFunctionKeys<T, K2> key2)
-               {
-
-                       TypeInformation<T> inputType = key1.getInputType();
-                       TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = 
createTypeWithKey(key1, key2);
-                       TwoKeyExtractingMapper<T, K1, K2> extractor =
-                               new 
TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor());
-
-                       MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, 
Tuple3<K1, K2, T>>> mapper =
-                               new MapOperatorBase<T, Tuple3<K1, K2, T>, 
MapFunction<T, Tuple3<K1, K2, T>>>(
-                                       extractor,
-                                       new 
UnaryOperatorInformation<>(inputType, typeInfoWithKey),
-                                       "Key Extractor"
-                               );
-
-                       mapper.setInput(input);
-                       mapper.setParallelism(input.getParallelism());
-
-                       return mapper;
-               }
-
-               public static <T, K> 
org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> 
appendKeyRemover(
-                       Operator<Tuple2<K, T>> inputWithKey,
-                       SelectorFunctionKeys<T, K> key)
-               {
-
-                       TypeInformation<T> inputType = key.getInputType();
-                       TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
createTypeWithKey(key);
-
-                       MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, 
T>, T>> mapper =
-                               new MapOperatorBase<Tuple2<K, T>, T, 
MapFunction<Tuple2<K, T>, T>>(
-                                       new KeyRemovingMapper<T, K>(),
-                                       new 
UnaryOperatorInformation<>(typeInfoWithKey, inputType),
-                                       "Key Remover"
-                               );
-                       mapper.setInput(inputWithKey);
-                       mapper.setParallelism(inputWithKey.getParallelism());
-
-                       return mapper;
-               }
-
-               public static <T, K> TypeInformation<Tuple2<K, T>> 
createTypeWithKey(
-                       SelectorFunctionKeys<T, K> key)
-               {
-                       return new TupleTypeInfo<>(key.getKeyType(), 
key.getInputType());
-               }
-
-               public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> 
createTypeWithKey(
-                       SelectorFunctionKeys<T, K1> key1,
-                       SelectorFunctionKeys<T, K2> key2)
-               {
-                       return new TupleTypeInfo<>(key1.getKeyType(), 
key2.getKeyType(), key1.getInputType());
-               }
-
-               @Override
-               public String toString() {
-                       return "Key function (Type: " + keyType + ")";
-               }
-       }
-       
-       
-       /**
-        * Represents (nested) field access through string and integer-based 
keys
-        */
-       public static class ExpressionKeys<T> extends Keys<T> {
-               
-               public static final String SELECT_ALL_CHAR = "*";
-               public static final String SELECT_ALL_CHAR_SCALA = "_";
-               
-               // Flattened fields representing keys fields
-               private List<FlatFieldDescriptor> keyFields;
-
-               /**
-                * ExpressionKeys that is defined by the full data type.
-                */
-               public ExpressionKeys(TypeInformation<T> type) {
-                       this(SELECT_ALL_CHAR, type);
-               }
-
-               /**
-                * Create int-based (non-nested) field position keys on a tuple 
type.
-                */
-               public ExpressionKeys(int keyPosition, TypeInformation<T> type) 
{
-                       this(new int[]{keyPosition}, type, false);
-               }
-
-               /**
-                * Create int-based (non-nested) field position keys on a tuple 
type.
-                */
-               public ExpressionKeys(int[] keyPositions, TypeInformation<T> 
type) {
-                       this(keyPositions, type, false);
-               }
-
-               /**
-                * Create int-based (non-nested) field position keys on a tuple 
type.
-                */
-               public ExpressionKeys(int[] keyPositions, TypeInformation<T> 
type, boolean allowEmpty) {
-
-                       if (!type.isTupleType() || !(type instanceof 
CompositeType)) {
-                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid " +
-                                               "for tuple data types. Type: " 
+ type);
-                       }
-                       if (type.getArity() == 0) {
-                               throw new InvalidProgramException("Tuple size 
must be greater than 0. Size: " + type.getArity());
-                       }
-                       if (!allowEmpty && (keyPositions == null || 
keyPositions.length == 0)) {
-                               throw new IllegalArgumentException("The 
grouping fields must not be empty.");
-                       }
-
-                       this.keyFields = new ArrayList<>();
-
-                       if (keyPositions == null || keyPositions.length == 0) {
-                               // use all tuple fields as key fields
-                               keyPositions = 
createIncrIntArray(type.getArity());
-                       } else {
-                               rangeCheckFields(keyPositions, type.getArity() 
- 1);
-                       }
-                       Preconditions.checkArgument(keyPositions.length > 0, 
"Grouping fields can not be empty at this point");
-
-                       // extract key field types
-                       CompositeType<T> cType = (CompositeType<T>)type;
-                       this.keyFields = new ArrayList<>(type.getTotalFields());
-
-                       // for each key position, find all (nested) field types
-                       String[] fieldNames = cType.getFieldNames();
-                       ArrayList<FlatFieldDescriptor> tmpList = new 
ArrayList<>();
-                       for (int keyPos : keyPositions) {
-                               tmpList.clear();
-                               // get all flat fields
-                               cType.getFlatFields(fieldNames[keyPos], 0, 
tmpList);
-                               // check if fields are of key type
-                               for(FlatFieldDescriptor ffd : tmpList) {
-                                       if(!ffd.getType().isKeyType()) {
-                                               throw new 
InvalidProgramException("This type (" + ffd.getType() + ") cannot be used as 
key.");
-                                       }
-                               }
-                               this.keyFields.addAll(tmpList);
-                       }
-               }
-
-               /**
-                * Create String-based (nested) field expression keys on a 
composite type.
-                */
-               public ExpressionKeys(String keyExpression, TypeInformation<T> 
type) {
-                       this(new String[]{keyExpression}, type);
-               }
-
-               /**
-                * Create String-based (nested) field expression keys on a 
composite type.
-                */
-               public ExpressionKeys(String[] keyExpressions, 
TypeInformation<T> type) {
-                       Preconditions.checkNotNull(keyExpressions, "Field 
expression cannot be null.");
-
-                       this.keyFields = new ArrayList<>(keyExpressions.length);
-
-                       if (type instanceof CompositeType){
-                               CompositeType<T> cType = (CompositeType<T>) 
type;
-
-                               // extract the keys on their flat position
-                               for (String keyExpr : keyExpressions) {
-                                       if (keyExpr == null) {
-                                               throw new 
InvalidProgramException("Expression key may not be null.");
-                                       }
-                                       // strip off whitespace
-                                       keyExpr = keyExpr.trim();
-
-                                       List<FlatFieldDescriptor> flatFields = 
cType.getFlatFields(keyExpr);
-
-                                       if (flatFields.size() == 0) {
-                                               throw new 
InvalidProgramException("Unable to extract key from expression '" + keyExpr + 
"' on key " + cType);
-                                       }
-                                       // check if all nested fields can be 
used as keys
-                                       for (FlatFieldDescriptor field : 
flatFields) {
-                                               if 
(!field.getType().isKeyType()) {
-                                                       throw new 
InvalidProgramException("This type (" + field.getType() + ") cannot be used as 
key.");
-                                               }
-                                       }
-                                       // add flat fields to key fields
-                                       keyFields.addAll(flatFields);
-                               }
-                       }
-                       else {
-                               if (!type.isKeyType()) {
-                                       throw new InvalidProgramException("This 
type (" + type + ") cannot be used as key.");
-                               }
-
-                               // check that all key expressions are valid
-                               for (String keyExpr : keyExpressions) {
-                                       if (keyExpr == null) {
-                                               throw new 
InvalidProgramException("Expression key may not be null.");
-                                       }
-                                       // strip off whitespace
-                                       keyExpr = keyExpr.trim();
-                                       // check that full type is addressed
-                                       if (!(SELECT_ALL_CHAR.equals(keyExpr) 
|| SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
-                                               throw new 
InvalidProgramException(
-                                                       "Field expression must 
be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for 
non-composite types.");
-                                       }
-                                       // add full type as key
-                                       keyFields.add(new 
FlatFieldDescriptor(0, type));
-                               }
-                       }
-               }
-               
-               @Override
-               public int getNumberOfKeyFields() {
-                       if(keyFields == null) {
-                               return 0;
-                       }
-                       return keyFields.size();
-               }
-
-               @Override
-               public int[] computeLogicalKeyPositions() {
-                       int[] logicalKeys = new int[keyFields.size()];
-                       for (int i = 0; i < keyFields.size(); i++) {
-                               logicalKeys[i] = keyFields.get(i).getPosition();
-                       }
-                       return logicalKeys;
-               }
-
-               @Override
-               protected TypeInformation<?>[] getKeyFieldTypes() {
-                       TypeInformation<?>[] fieldTypes = new 
TypeInformation[keyFields.size()];
-                       for (int i = 0; i < keyFields.size(); i++) {
-                               fieldTypes[i] = keyFields.get(i).getType();
-                       }
-                       return fieldTypes;
-               }
-
-               @Override
-               public <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo) {
-
-                       if (keyFields.size() != 1) {
-                               throw new InvalidProgramException("Custom 
partitioners can only be used with keys that have one key field.");
-                       }
-
-                       if (typeInfo == null) {
-                               // try to extract key type from partitioner
-                               try {
-                                       typeInfo = 
TypeExtractor.getPartitionerTypes(partitioner);
-                               }
-                               catch (Throwable t) {
-                                       // best effort check, so we ignore 
exceptions
-                               }
-                       }
-
-                       if (typeInfo != null && !(typeInfo instanceof 
GenericTypeInfo)) {
-                               // only check type compatibility if type is 
known and not a generic type
-
-                               TypeInformation<?> keyType = 
keyFields.get(0).getType();
-                               if (!keyType.equals(typeInfo)) {
-                                       throw new InvalidProgramException("The 
partitioner is incompatible with the key type. "
-                                                                               
+ "Partitioner type: " + typeInfo + " , key type: " + keyType);
-                               }
-                       }
-               }
-
-               @Override
-               public String toString() {
-                       Joiner join = Joiner.on('.');
-                       return "ExpressionKeys: " + join.join(keyFields);
-               }
-
-               public static boolean isSortKey(int fieldPos, 
TypeInformation<?> type) {
-
-                       if (!type.isTupleType() || !(type instanceof 
CompositeType)) {
-                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid " +
-                                       "for tuple data types. Type: " + type);
-                       }
-                       if (type.getArity() == 0) {
-                               throw new InvalidProgramException("Tuple size 
must be greater than 0. Size: " + type.getArity());
-                       }
-
-                       if(fieldPos < 0 || fieldPos >= type.getArity()) {
-                               throw new IndexOutOfBoundsException("Tuple 
position is out of range: " + fieldPos);
-                       }
-
-                       TypeInformation<?> sortKeyType = 
((CompositeType<?>)type).getTypeAt(fieldPos);
-                       return sortKeyType.isSortKeyType();
-               }
-
-               public static boolean isSortKey(String fieldExpr, 
TypeInformation<?> type) {
-
-                       TypeInformation<?> sortKeyType;
-
-                       fieldExpr = fieldExpr.trim();
-                       if (SELECT_ALL_CHAR.equals(fieldExpr) || 
SELECT_ALL_CHAR_SCALA.equals(fieldExpr)) {
-                               sortKeyType = type;
-                       }
-                       else {
-                               if (type instanceof CompositeType) {
-                                       sortKeyType = ((CompositeType<?>) 
type).getTypeAt(fieldExpr);
-                               }
-                               else {
-                                       throw new InvalidProgramException(
-                                               "Field expression must be equal 
to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for atomic 
types.");
-                               }
-                       }
-
-                       return sortKeyType.isSortKeyType();
-               }
-
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-
-       private static int[] createIncrIntArray(int numKeys) {
-               int[] keyFields = new int[numKeys];
-               for (int i = 0; i < numKeys; i++) {
-                       keyFields[i] = i;
-               }
-               return keyFields;
-       }
-
-       private static void rangeCheckFields(int[] fields, int maxAllowedField) 
{
-
-               for (int f : fields) {
-                       if (f < 0 || f > maxAllowedField) {
-                               throw new IndexOutOfBoundsException("Tuple 
position is out of range: " + f);
-                       }
-               }
-       }
-
-       public static class IncompatibleKeysException extends Exception {
-               private static final long serialVersionUID = 1L;
-               public static final String SIZE_MISMATCH_MESSAGE = "The number 
of specified keys is different.";
-               
-               public IncompatibleKeysException(String message) {
-                       super(message);
-               }
-
-               public IncompatibleKeysException(TypeInformation<?> 
typeInformation, TypeInformation<?> typeInformation2) {
-                       super(typeInformation+" and "+typeInformation2+" are 
not compatible");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index 1384ca2..96931b0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -21,13 +21,14 @@ package org.apache.flink.api.java.operators;
 import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
@@ -150,9 +151,9 @@ public class PartitionOperator<T> extends 
SingleInputOperator<T, T, PartitionOpe
                Partitioner<?> customPartitioner)
        {
                final SelectorFunctionKeys<T, K> keys = 
(SelectorFunctionKeys<T, K>) rawKeys;
-               TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
SelectorFunctionKeys.createTypeWithKey(keys);
+               TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
KeyFunctions.createTypeWithKey(keys);
 
-               Operator<Tuple2<K, T>> keyedInput = 
SelectorFunctionKeys.appendKeyExtractor(input, keys);
+               Operator<Tuple2<K, T>> keyedInput = 
KeyFunctions.appendKeyExtractor(input, keys);
 
                PartitionOperatorBase<Tuple2<K, T>> keyedPartitionedInput =
                        new PartitionOperatorBase<>(new 
UnaryOperatorInformation<>(typeInfoWithKey, typeInfoWithKey), pMethod, new 
int[]{0}, name);
@@ -160,7 +161,7 @@ public class PartitionOperator<T> extends 
SingleInputOperator<T, T, PartitionOpe
                keyedPartitionedInput.setCustomPartitioner(customPartitioner);
                keyedPartitionedInput.setParallelism(partitionDop);
 
-               return 
SelectorFunctionKeys.appendKeyRemover(keyedPartitionedInput, keys);
+               return KeyFunctions.appendKeyRemover(keyedPartitionedInput, 
keys);
        }
 
        

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 6f8877f..a22a262 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -19,13 +19,14 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
@@ -161,13 +162,13 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                @SuppressWarnings("unchecked")
                final SelectorFunctionKeys<T, K> keys = 
(SelectorFunctionKeys<T, K>) rawKeys;
                
-               TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
SelectorFunctionKeys.createTypeWithKey(keys);
-               Operator<Tuple2<K, T>> keyedInput = 
SelectorFunctionKeys.appendKeyExtractor(input, keys);
+               TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
KeyFunctions.createTypeWithKey(keys);
+               Operator<Tuple2<K, T>> keyedInput = 
KeyFunctions.appendKeyExtractor(input, keys);
                
                PlanUnwrappingReduceOperator<T, K> reducer = new 
PlanUnwrappingReduceOperator<>(function, keys, name, inputType, 
typeInfoWithKey);
                reducer.setInput(keyedInput);
                reducer.setParallelism(parallelism);
 
-               return SelectorFunctionKeys.appendKeyRemover(reducer, keys);
+               return KeyFunctions.appendKeyRemover(reducer, keys);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
index d65bc68..c8a8684 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 07d0b9a..2453f1b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
@@ -31,7 +32,7 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
index c55d919..6144975 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.CodeAnalysisMode;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.sca.CodeAnalyzerException;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 5b0a368..4e6f6ff 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
index e14e06d..734456c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
 import org.apache.flink.api.java.operators.JoinOperator.EquiJoin;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
index fb74d1e..f620e11 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
index 2c62732..2307c0c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
index 56f34cc..30e28eb 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index c8e40ce..95b5840 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 72b79a4..21f15d4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 2aa8d54..e85bb79 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index e52a5c4..46773fa 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 278d706..f0e8055 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java 
b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
index 1f4b44a..93cba33 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java
@@ -32,8 +32,8 @@ import 
org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.sca.TaggedValue.Input;
 import org.objectweb.asm.Type;
 import org.objectweb.asm.tree.MethodNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
deleted file mode 100644
index 5f74513..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Special type information to generate a special AvroTypeInfo for Avro POJOs 
(implementing SpecificRecordBase, the typed Avro POJOs)
- *
- * Proceeding: It uses a regular pojo type analysis and replaces all {@code 
GenericType<CharSequence>}
- *     with a {@code GenericType<avro.Utf8>}.
- * All other types used by Avro are standard Java types.
- * Only strings are represented as CharSequence fields and represented as Utf8 
classes at runtime.
- * CharSequence is not comparable. To make them nicely usable with field 
expressions, we replace them here
- * by generic type infos containing Utf8 classes (which are comparable),
- *
- * This class is checked by the AvroPojoTest.
- * @param <T>
- */
-public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
-       public AvroTypeInfo(Class<T> typeClass) {
-               super(typeClass, generateFieldsFromAvroSchema(typeClass));
-       }
-
-       private static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
-               PojoTypeExtractor pte = new PojoTypeExtractor();
-               TypeInformation ti = pte.analyzePojo(typeClass, new 
ArrayList<Type>(), null, null, null);
-
-               if(!(ti instanceof PojoTypeInfo)) {
-                       throw new IllegalStateException("Expecting type to be a 
PojoTypeInfo");
-               }
-               PojoTypeInfo pti =  (PojoTypeInfo) ti;
-               List<PojoField> newFields = new 
ArrayList<PojoField>(pti.getTotalFields());
-
-               for(int i = 0; i < pti.getArity(); i++) {
-                       PojoField f = pti.getPojoFieldAt(i);
-                       TypeInformation newType = f.getTypeInformation();
-                       // check if type is a CharSequence
-                       if(newType instanceof GenericTypeInfo) {
-                               
if((newType).getTypeClass().equals(CharSequence.class)) {
-                                       // replace the type by a 
org.apache.avro.util.Utf8
-                                       newType = new 
GenericTypeInfo(org.apache.avro.util.Utf8.class);
-                               }
-                       }
-                       PojoField newField = new PojoField(f.getField(), 
newType);
-                       newFields.add(newField);
-               }
-               return newFields;
-       }
-
-       private static class PojoTypeExtractor extends TypeExtractor {
-               private PojoTypeExtractor() {
-                       super();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
deleted file mode 100644
index 8382831..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-/**
- * This type represents a value of one two possible types, Left or Right (a
- * disjoint union), inspired by Scala's Either type.
- *
- * @param <L>
- *            the type of Left
- * @param <R>
- *            the type of Right
- */
-public abstract class Either<L, R> {
-
-       /**
-        * Create a Left value of Either
-        */
-       public static <L, R> Either<L, R> Left(L value) {
-               return new Left<L, R>(value);
-       }
-
-       /**
-        * Create a Right value of Either
-        */
-       public static <L, R> Either<L, R> Right(R value) {
-               return new Right<L, R>(value);
-       }
-
-       /**
-        * Retrieve the Left value of Either.
-        * 
-        * @return the Left value
-        * @throws IllegalStateException
-        *             if called on a Right
-        */
-       public abstract L left() throws IllegalStateException;
-
-       /**
-        * Retrieve the Right value of Either.
-        * 
-        * @return the Right value
-        * @throws IllegalStateException
-        *             if called on a Left
-        */
-       public abstract R right() throws IllegalStateException;
-
-       /**
-        * 
-        * @return true if this is a Left value, false if this is a Right value
-        */
-       public final boolean isLeft() {
-               return getClass() == Left.class;
-       }
-
-       /**
-        * 
-        * @return true if this is a Right value, false if this is a Left value
-        */
-       public final boolean isRight() {
-               return getClass() == Right.class;
-       }
-
-       /**
-        * A left value of {@link Either}
-        *
-        * @param <L>
-        *            the type of Left
-        * @param <R>
-        *            the type of Right
-        */
-       public static class Left<L, R> extends Either<L, R> {
-               private final L value;
-
-               public Left(L value) {
-                       this.value = java.util.Objects.requireNonNull(value);
-               }
-
-               @Override
-               public L left() {
-                       return value;
-               }
-
-               @Override
-               public R right() {
-                       throw new IllegalStateException("Cannot retrieve Right 
value on a Left");
-               }
-
-               @Override
-               public boolean equals(Object object) {
-                       if (object instanceof Left<?, ?>) {
-                               final Left<?, ?> other = (Left<?, ?>) object;
-                               return value.equals(other.value);
-                       }
-                       return false;
-               }
-
-               @Override
-               public int hashCode() {
-                       return value.hashCode();
-               }
-
-               @Override
-               public String toString() {
-                       return "Left(" + value.toString() + ")";
-               }
-
-               /**
-                * Creates a left value of {@link Either}
-                * 
-                */
-               public static <L, R> Left<L, R> of(L left) {
-                       return new Left<L, R>(left);
-               }
-       }
-
-       /**
-        * A right value of {@link Either}
-        *
-        * @param <L>
-        *            the type of Left
-        * @param <R>
-        *            the type of Right
-        */
-       public static class Right<L, R> extends Either<L, R> {
-               private final R value;
-
-               public Right(R value) {
-                       this.value = java.util.Objects.requireNonNull(value);
-               }
-
-               @Override
-               public L left() {
-                       throw new IllegalStateException("Cannot retrieve Left 
value on a Right");
-               }
-
-               @Override
-               public R right() {
-                       return value;
-               }
-
-               @Override
-               public boolean equals(Object object) {
-                       if (object instanceof Right<?, ?>) {
-                               final Right<?, ?> other = (Right<?, ?>) object;
-                               return value.equals(other.value);
-                       }
-                       return false;
-               }
-
-               @Override
-               public int hashCode() {
-                       return value.hashCode();
-               }
-
-               @Override
-               public String toString() {
-                       return "Right(" + value.toString() + ")";
-               }
-
-               /**
-                * Creates a right value of {@link Either}
-                * 
-                */
-               public static <L, R> Right<L, R> of(R right) {
-                       return new Right<L, R>(right);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
deleted file mode 100644
index ec7be97..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
-
-/**
- * A {@link TypeInformation} for the {@link Either} type of the Java API.
- *
- * @param <L> the Left value type
- * @param <R> the Right value type
- */
-public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final TypeInformation<L> leftType;
-
-       private final TypeInformation<R> rightType;
-
-       public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> 
rightType) {
-               this.leftType = leftType;
-               this.rightType = rightType;
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return 1;
-       }
-
-       @Override
-       public int getTotalFields() {
-               return 1;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public Class<Either<L, R>> getTypeClass() {
-               return (Class<Either<L, R>>) (Class<?>) Either.class;
-       }
-
-       @Override
-       public boolean isKeyType() {
-               return false;
-       }
-
-       @Override
-       public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig 
config) {
-               return new EitherSerializer<L, 
R>(leftType.createSerializer(config),
-                               rightType.createSerializer(config));
-       }
-
-       @Override
-       public String toString() {
-               return "Either <" + leftType.toString() + ", " + 
rightType.toString() + ">";
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof EitherTypeInfo) {
-                       EitherTypeInfo<L, R> other = (EitherTypeInfo<L, R>) obj;
-
-                       return other.canEqual(this) &&
-                               leftType.equals(other.leftType) &&
-                               rightType.equals(other.rightType);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return 17 * leftType.hashCode() + rightType.hashCode();
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof EitherTypeInfo;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public TypeInformation<L> getLeftType() {
-               return leftType;
-       }
-
-       public TypeInformation<R> getRightType() {
-               return rightType;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
deleted file mode 100644
index de59c36..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.EnumComparator;
-import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-
-/**
- * A {@link TypeInformation} for java enumeration types. 
- *
- * @param <T> The type represented by this type information.
- */
-public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> 
implements AtomicType<T> {
-
-       private static final long serialVersionUID = 8936740290137178660L;
-       
-       private final Class<T> typeClass;
-
-       public EnumTypeInfo(Class<T> typeClass) {
-               Preconditions.checkNotNull(typeClass, "Enum type class must not 
be null.");
-
-               if (!Enum.class.isAssignableFrom(typeClass) ) {
-                       throw new IllegalArgumentException("EnumTypeInfo can 
only be used for subclasses of " + Enum.class.getName());
-               }
-
-               this.typeClass = typeClass;
-       }
-
-       @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
-               return new EnumComparator<T>(sortOrderAscending);
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return 1;
-       }
-       
-       @Override
-       public int getTotalFields() {
-               return 1;
-       }
-
-       @Override
-       public Class<T> getTypeClass() {
-               return this.typeClass;
-       }
-
-       @Override
-       public boolean isKeyType() {
-               return true;
-       }
-
-       @Override
-       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
-               return new EnumSerializer<T>(typeClass);
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Standard utils
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "EnumTypeInfo<" + typeClass.getName() + ">";
-       }       
-       
-       @Override
-       public int hashCode() {
-               return typeClass.hashCode();
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof EnumTypeInfo;
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof EnumTypeInfo) {
-                       @SuppressWarnings("unchecked")
-                       EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj;
-
-                       return enumTypeInfo.canEqual(this) &&
-                               typeClass == enumTypeInfo.typeClass;
-               } else {
-                       return false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
deleted file mode 100644
index 7e7aa68..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-
-
-public class GenericTypeInfo<T> extends TypeInformation<T> implements 
AtomicType<T> {
-
-       private static final long serialVersionUID = -7959114120287706504L;
-       
-       private final Class<T> typeClass;
-
-       public GenericTypeInfo(Class<T> typeClass) {
-               this.typeClass = Preconditions.checkNotNull(typeClass);
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return 1;
-       }
-       
-       @Override
-       public int getTotalFields() {
-               return 1;
-       }
-
-       @Override
-       public Class<T> getTypeClass() {
-               return typeClass;
-       }
-       
-       @Override
-       public boolean isKeyType() {
-               return Comparable.class.isAssignableFrom(typeClass);
-       }
-
-       @Override
-       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-               return new KryoSerializer<T>(this.typeClass, config);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
-               if (isKeyType()) {
-                       @SuppressWarnings("rawtypes")
-                       GenericTypeComparator comparator = new 
GenericTypeComparator(sortOrderAscending, createSerializer(executionConfig), 
this.typeClass);
-                       return (TypeComparator<T>) comparator;
-               }
-
-               throw new UnsupportedOperationException("Types that do not 
implement java.lang.Comparable cannot be used as keys.");
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public int hashCode() {
-               return typeClass.hashCode();
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof GenericTypeInfo;
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof GenericTypeInfo) {
-                       @SuppressWarnings("unchecked")
-                       GenericTypeInfo<T> genericTypeInfo = 
(GenericTypeInfo<T>) obj;
-
-                       return typeClass == genericTypeInfo.typeClass;
-               } else {
-                       return false;
-               }
-       }
-       
-       @Override
-       public String toString() {
-               return "GenericType<" + typeClass.getCanonicalName() + ">";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
deleted file mode 100644
index f8b4247..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this 
interface to be configured
- * with the data type they will operate on. The method {@link 
#setInputType(org.apache.flink.api
- * .common.typeinfo.TypeInformation, 
org.apache.flink.api.common.ExecutionConfig)} will be
- * called when the output format is used with an output method such as
- * {@link 
org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
- */
-public interface InputTypeConfigurable {
-
-       /**
-        * Method that is called on an {@link 
org.apache.flink.api.common.io.OutputFormat} when it is passed to
-        * the DataSet's output method. May be used to configures the output 
format based on the data type.
-        *
-        * @param type The data type of the input.
-        * @param executionConfig
-        */
-       void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
deleted file mode 100644
index 1dd7f01..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-/**
- * A special type information signifying that the type extraction failed. It 
contains
- * additional error information.
- */
-public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
-
-       private static final long serialVersionUID = -4212082837126702723L;
-       
-       private final String functionName;
-       private final InvalidTypesException typeException;
-
-       
-       public MissingTypeInfo(String functionName) {
-               this(functionName, new InvalidTypesException("An unknown error 
occured."));
-       }
-
-       public MissingTypeInfo(String functionName, InvalidTypesException 
typeException) {
-               this.functionName = functionName;
-               this.typeException = typeException;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public String getFunctionName() {
-               return functionName;
-       }
-
-       public InvalidTypesException getTypeException() {
-               return typeException;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean isBasicType() {
-               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
-       }
-
-       @Override
-       public boolean isTupleType() {
-               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
-       }
-
-       @Override
-       public int getArity() {
-               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
-       }
-
-       @Override
-       public Class<InvalidTypesException> getTypeClass() {
-               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
-       }
-
-       @Override
-       public boolean isKeyType() {
-               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
-       }
-
-       @Override
-       public TypeSerializer<InvalidTypesException> 
createSerializer(ExecutionConfig executionConfig) {
-               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
-       }
-
-       @Override
-       public String toString() {
-               return getClass().getSimpleName() + "<" + functionName + ", " + 
typeException.getMessage() + ">";
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof MissingTypeInfo) {
-                       MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj;
-
-                       return missingTypeInfo.canEqual(this) &&
-                               
functionName.equals(missingTypeInfo.functionName) &&
-                               
typeException.equals(missingTypeInfo.typeException);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return 31 * functionName.hashCode() + typeException.hashCode();
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof MissingTypeInfo;
-       }
-
-       @Override
-       public int getTotalFields() {
-               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
deleted file mode 100644
index 150c976..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.lang.reflect.Array;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-
-public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
-
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> arrayType;
-       private final TypeInformation<C> componentInfo;
-
-       private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> 
componentInfo) {
-               this.arrayType = Preconditions.checkNotNull(arrayType);
-               this.componentInfo = Preconditions.checkNotNull(componentInfo);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return 1;
-       }
-
-       @Override
-       public int getTotalFields() {
-               return 1;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public Class<T> getTypeClass() {
-               return arrayType;
-       }
-
-       public TypeInformation<C> getComponentInfo() {
-               return componentInfo;
-       }
-
-       @Override
-       public boolean isKeyType() {
-               return false;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
-               return (TypeSerializer<T>) new GenericArraySerializer<C>(
-                       componentInfo.getTypeClass(),
-                       componentInfo.createSerializer(executionConfig));
-       }
-
-       @Override
-       public String toString() {
-               return this.getClass().getSimpleName() + "<" + 
this.componentInfo + ">";
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof ObjectArrayTypeInfo) {
-                       @SuppressWarnings("unchecked")
-                       ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = 
(ObjectArrayTypeInfo<T, C>)obj;
-
-                       return objectArrayTypeInfo.canEqual(this) &&
-                               arrayType == objectArrayTypeInfo.arrayType &&
-                               
componentInfo.equals(objectArrayTypeInfo.componentInfo);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof ObjectArrayTypeInfo;
-       }
-
-       @Override
-       public int hashCode() {
-               return 31 * this.arrayType.hashCode() + 
this.componentInfo.hashCode();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> 
arrayClass, TypeInformation<C> componentInfo) {
-               Preconditions.checkNotNull(arrayClass);
-               Preconditions.checkNotNull(componentInfo);
-               Preconditions.checkArgument(arrayClass.isArray(), "Class " + 
arrayClass + " must be an array.");
-
-               return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
-       }
-
-       /**
-        * Creates a new {@link 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo} from a
-        * {@link TypeInformation} for the component type.
-        *
-        * <p>
-        * This must be used in cases where the complete type of the array is 
not available as a
-        * {@link java.lang.reflect.Type} or {@link java.lang.Class}.
-        */
-       @SuppressWarnings("unchecked")
-       public static <T, C> ObjectArrayTypeInfo<T, C> 
getInfoFor(TypeInformation<C> componentInfo) {
-               Preconditions.checkNotNull(componentInfo);
-
-               return new ObjectArrayTypeInfo<T, C>(
-                       
(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
-                       componentInfo);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
deleted file mode 100644
index 1b008c0..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * Represent a field definition for {@link PojoTypeInfo} type of objects.
- */
-public class PojoField implements Serializable {
-
-       private static final long serialVersionUID = 1975295846436559363L;
-
-       private transient Field field;
-       private final TypeInformation<?> type;
-
-       public PojoField(Field field, TypeInformation<?> type) {
-               this.field = Preconditions.checkNotNull(field);
-               this.type = Preconditions.checkNotNull(type);
-       }
-
-       public Field getField() {
-               return field;
-       }
-
-       public TypeInformation<?> getTypeInformation() {
-               return type;
-       }
-
-       private void writeObject(ObjectOutputStream out)
-                       throws IOException, ClassNotFoundException {
-               out.defaultWriteObject();
-               out.writeObject(field.getDeclaringClass());
-               out.writeUTF(field.getName());
-       }
-
-       private void readObject(ObjectInputStream in)
-                       throws IOException, ClassNotFoundException {
-               in.defaultReadObject();
-               Class<?> clazz = (Class<?>)in.readObject();
-               String fieldName = in.readUTF();
-               field = null;
-               // try superclasses as well
-               while (clazz != null) {
-                       try {
-                               field = clazz.getDeclaredField(fieldName);
-                               field.setAccessible(true);
-                               break;
-                       } catch (NoSuchFieldException e) {
-                               clazz = clazz.getSuperclass();
-                       }
-               }
-               if (field == null) {
-                       throw new RuntimeException("Class resolved at 
TaskManager is not compatible with class read during Plan setup."
-                                       + " (" + fieldName + ")");
-               }
-       }
-
-       @Override
-       public String toString() {
-               return "PojoField " + field.getDeclaringClass() + "." + 
field.getName() + " (" + type + ")";
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof PojoField) {
-                       PojoField other = (PojoField) obj;
-
-                       return other.canEqual(this) && type.equals(other.type) 
&&
-                               Objects.equals(field, other.field);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return Objects.hash(field, type);
-       }
-
-       public boolean canEqual(Object obj) {
-               return obj instanceof PojoField;
-       }
-}
\ No newline at end of file

Reply via email to