[FLINK-3303] [core] Move all type utilities to flink-core

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21a71586
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21a71586
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21a71586

Branch: refs/heads/master
Commit: 21a715867d655bb61df9a9f7eef37e42b99e206a
Parents: 7081836
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jan 31 23:28:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 2 16:55:44 2016 +0100

----------------------------------------------------------------------
 flink-core/pom.xml                              |   47 +-
 .../apache/flink/api/common/operators/Keys.java |  459 +++++
 .../flink/api/java/functions/KeySelector.java   |   63 +
 .../flink/api/java/typeutils/AvroTypeInfo.java  |   78 +
 .../api/java/typeutils/EitherTypeInfo.java      |  122 ++
 .../flink/api/java/typeutils/EnumTypeInfo.java  |  122 ++
 .../api/java/typeutils/GenericTypeInfo.java     |  116 ++
 .../java/typeutils/InputTypeConfigurable.java   |   42 +
 .../api/java/typeutils/MissingTypeInfo.java     |  121 ++
 .../api/java/typeutils/ObjectArrayTypeInfo.java |  141 ++
 .../flink/api/java/typeutils/PojoField.java     |  108 +
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  406 ++++
 .../api/java/typeutils/ResultTypeQueryable.java |   37 +
 .../flink/api/java/typeutils/TupleTypeInfo.java |  248 +++
 .../api/java/typeutils/TupleTypeInfoBase.java   |  252 +++
 .../flink/api/java/typeutils/TypeExtractor.java | 1692 +++++++++++++++
 .../api/java/typeutils/TypeInfoParser.java      |  383 ++++
 .../flink/api/java/typeutils/ValueTypeInfo.java |  183 ++
 .../api/java/typeutils/WritableTypeInfo.java    |  139 ++
 .../java/typeutils/runtime/AvroSerializer.java  |  201 ++
 .../runtime/CopyableValueComparator.java        |  167 ++
 .../runtime/CopyableValueSerializer.java        |  129 ++
 .../typeutils/runtime/DataInputDecoder.java     |  229 +++
 .../typeutils/runtime/DataInputViewStream.java  |   71 +
 .../typeutils/runtime/DataOutputEncoder.java    |  190 ++
 .../typeutils/runtime/DataOutputViewStream.java |   41 +
 .../typeutils/runtime/EitherSerializer.java     |  193 ++
 .../runtime/GenericTypeComparator.java          |  177 ++
 .../api/java/typeutils/runtime/KryoUtils.java   |   87 +
 .../java/typeutils/runtime/NoFetchingInput.java |  141 ++
 .../java/typeutils/runtime/PojoComparator.java  |  354 ++++
 .../java/typeutils/runtime/PojoSerializer.java  |  592 ++++++
 .../runtime/RuntimeComparatorFactory.java       |   75 +
 .../runtime/RuntimePairComparatorFactory.java   |   44 +
 .../runtime/RuntimeSerializerFactory.java       |  124 ++
 .../typeutils/runtime/Tuple0Serializer.java     |  121 ++
 .../java/typeutils/runtime/TupleComparator.java |  157 ++
 .../typeutils/runtime/TupleComparatorBase.java  |  279 +++
 .../java/typeutils/runtime/TupleSerializer.java |  158 ++
 .../typeutils/runtime/TupleSerializerBase.java  |  102 +
 .../java/typeutils/runtime/ValueComparator.java |  183 ++
 .../java/typeutils/runtime/ValueSerializer.java |  152 ++
 .../typeutils/runtime/WritableComparator.java   |  189 ++
 .../typeutils/runtime/WritableSerializer.java   |  153 ++
 .../typeutils/runtime/kryo/KryoSerializer.java  |  382 ++++
 .../typeutils/runtime/kryo/Serializers.java     |  227 ++
 .../common/operators/ExpressionKeysTest.java    |  481 +++++
 .../operators/SelectorFunctionKeysTest.java     |  154 ++
 .../apache/flink/api/java/tuple/Tuple2Test.java |   44 +
 .../api/java/typeutils/CompositeTypeTest.java   |  179 ++
 .../api/java/typeutils/EitherTypeInfoTest.java  |   61 +
 .../api/java/typeutils/EnumTypeInfoTest.java    |   51 +
 .../api/java/typeutils/GenericTypeInfoTest.java |   47 +
 .../api/java/typeutils/MissingTypeInfoTest.java |   47 +
 .../java/typeutils/ObjectArrayTypeInfoTest.java |   58 +
 .../java/typeutils/PojoTypeExtractionTest.java  |  812 ++++++++
 .../api/java/typeutils/PojoTypeInfoTest.java    |  153 ++
 .../java/typeutils/PojoTypeInformationTest.java |   98 +
 .../api/java/typeutils/TupleTypeInfoTest.java   |   96 +
 .../TypeExtractorInputFormatsTest.java          |  231 +++
 .../api/java/typeutils/TypeExtractorTest.java   | 1907 +++++++++++++++++
 .../api/java/typeutils/TypeInfoParserTest.java  |  338 +++
 .../api/java/typeutils/ValueTypeInfoTest.java   |   87 +
 .../java/typeutils/WritableTypeInfoTest.java    |   74 +
 .../AbstractGenericArraySerializerTest.java     |  187 ++
 .../AbstractGenericTypeComparatorTest.java      |  376 ++++
 .../AbstractGenericTypeSerializerTest.java      |  364 ++++
 .../runtime/AvroGenericArraySerializerTest.java |   28 +
 .../runtime/AvroGenericTypeComparatorTest.java  |   28 +
 .../runtime/AvroGenericTypeSerializerTest.java  |   29 +
 .../runtime/AvroSerializerEmptyArrayTest.java   |  189 ++
 .../runtime/CopyableValueComparatorTest.java    |   53 +
 .../typeutils/runtime/EitherSerializerTest.java |  113 +
 .../runtime/GenericPairComparatorTest.java      |   89 +
 .../MultidimensionalArraySerializerTest.java    |  120 ++
 .../typeutils/runtime/PojoComparatorTest.java   |   63 +
 .../typeutils/runtime/PojoContainingTuple.java  |   44 +
 .../runtime/PojoGenericTypeSerializerTest.java  |   33 +
 .../typeutils/runtime/PojoSerializerTest.java   |  243 +++
 .../runtime/PojoSubclassComparatorTest.java     |   76 +
 .../runtime/PojoSubclassSerializerTest.java     |  196 ++
 .../typeutils/runtime/StringArrayWritable.java  |   83 +
 .../SubclassFromInterfaceSerializerTest.java    |  171 ++
 .../runtime/TestDataOutputSerializer.java       |  308 +++
 .../runtime/TupleComparatorILD2Test.java        |   73 +
 .../runtime/TupleComparatorILD3Test.java        |   75 +
 .../runtime/TupleComparatorILDC3Test.java       |   75 +
 .../runtime/TupleComparatorILDX1Test.java       |   71 +
 .../runtime/TupleComparatorILDXC2Test.java      |   73 +
 .../runtime/TupleComparatorISD1Test.java        |   69 +
 .../runtime/TupleComparatorISD2Test.java        |   73 +
 .../runtime/TupleComparatorISD3Test.java        |   75 +
 .../runtime/TupleComparatorTTT1Test.java        |  139 ++
 .../runtime/TupleComparatorTTT2Test.java        |  145 ++
 .../runtime/TupleComparatorTTT3Test.java        |  154 ++
 .../typeutils/runtime/TupleSerializerTest.java  |  238 +++
 .../runtime/TupleSerializerTestInstance.java    |   79 +
 .../typeutils/runtime/ValueComparatorTest.java  |   53 +
 .../runtime/ValueComparatorUUIDTest.java        |   46 +
 .../api/java/typeutils/runtime/ValueID.java     |   72 +
 .../runtime/ValueSerializerUUIDTest.java        |   50 +
 .../runtime/WritableComparatorTest.java         |   53 +
 .../runtime/WritableComparatorUUIDTest.java     |   46 +
 .../api/java/typeutils/runtime/WritableID.java  |   78 +
 .../runtime/WritableSerializerTest.java         |   50 +
 .../runtime/WritableSerializerUUIDTest.java     |   50 +
 .../runtime/kryo/KryoClearedBufferTest.java     |  287 +++
 .../kryo/KryoGenericArraySerializerTest.java    |   30 +
 .../kryo/KryoGenericTypeComparatorTest.java     |   30 +
 .../kryo/KryoGenericTypeSerializerTest.java     |  168 ++
 .../kryo/KryoWithCustomSerializersTest.java     |   75 +
 .../typeutils/runtime/kryo/SerializersTest.java |  103 +
 .../tuple/base/TupleComparatorTestBase.java     |   43 +
 .../tuple/base/TuplePairComparatorTestBase.java |  109 +
 flink-java/pom.xml                              |   32 +-
 .../java/org/apache/flink/api/java/DataSet.java |    2 +-
 .../java/org/apache/flink/api/java/Utils.java   |   19 -
 .../flink/api/java/functions/KeySelector.java   |   63 -
 .../api/java/functions/SemanticPropUtil.java    |    2 +-
 .../flink/api/java/io/SplitDataProperties.java  |    2 +-
 .../api/java/operators/AggregateOperator.java   |    1 +
 .../api/java/operators/CoGroupOperator.java     |   23 +-
 .../api/java/operators/CoGroupRawOperator.java  |    3 +-
 .../flink/api/java/operators/DataSink.java      |    1 +
 .../api/java/operators/DeltaIteration.java      |    1 +
 .../java/operators/DeltaIterationResultSet.java |    1 +
 .../api/java/operators/DistinctOperator.java    |    7 +-
 .../java/operators/GroupCombineOperator.java    |   11 +-
 .../api/java/operators/GroupReduceOperator.java |   11 +-
 .../flink/api/java/operators/Grouping.java      |    1 +
 .../flink/api/java/operators/JoinOperator.java  |   15 +-
 .../flink/api/java/operators/KeyFunctions.java  |  119 ++
 .../apache/flink/api/java/operators/Keys.java   |  550 -----
 .../api/java/operators/PartitionOperator.java   |    9 +-
 .../api/java/operators/ReduceOperator.java      |    9 +-
 .../java/operators/SortPartitionOperator.java   |    1 +
 .../api/java/operators/SortedGrouping.java      |    3 +-
 .../api/java/operators/UdfOperatorUtils.java    |    1 +
 .../api/java/operators/UnsortedGrouping.java    |    1 +
 .../operators/join/JoinOperatorSetsBase.java    |    2 +-
 .../PlanBothUnwrappingCoGroupOperator.java      |    2 +-
 .../PlanLeftUnwrappingCoGroupOperator.java      |    2 +-
 .../PlanRightUnwrappingCoGroupOperator.java     |    2 +-
 .../PlanUnwrappingGroupCombineOperator.java     |    2 +-
 .../PlanUnwrappingReduceGroupOperator.java      |    2 +-
 .../PlanUnwrappingReduceOperator.java           |    2 +-
 ...lanUnwrappingSortedGroupCombineOperator.java |    2 +-
 ...PlanUnwrappingSortedReduceGroupOperator.java |    2 +-
 .../apache/flink/api/java/sca/UdfAnalyzer.java  |    4 +-
 .../flink/api/java/typeutils/AvroTypeInfo.java  |   78 -
 .../apache/flink/api/java/typeutils/Either.java |  185 --
 .../api/java/typeutils/EitherTypeInfo.java      |  121 --
 .../flink/api/java/typeutils/EnumTypeInfo.java  |  122 --
 .../api/java/typeutils/GenericTypeInfo.java     |  116 --
 .../java/typeutils/InputTypeConfigurable.java   |   42 -
 .../api/java/typeutils/MissingTypeInfo.java     |  121 --
 .../api/java/typeutils/ObjectArrayTypeInfo.java |  141 --
 .../flink/api/java/typeutils/PojoField.java     |  108 -
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  405 ----
 .../api/java/typeutils/ResultTypeQueryable.java |   37 -
 .../flink/api/java/typeutils/TupleTypeInfo.java |  248 ---
 .../api/java/typeutils/TupleTypeInfoBase.java   |  251 ---
 .../flink/api/java/typeutils/TypeExtractor.java | 1687 ---------------
 .../api/java/typeutils/TypeInfoParser.java      |  383 ----
 .../flink/api/java/typeutils/ValueTypeInfo.java |  183 --
 .../api/java/typeutils/WritableTypeInfo.java    |  139 --
 .../java/typeutils/runtime/AvroSerializer.java  |  201 --
 .../runtime/CopyableValueComparator.java        |  167 --
 .../runtime/CopyableValueSerializer.java        |  129 --
 .../typeutils/runtime/DataInputDecoder.java     |  229 ---
 .../typeutils/runtime/DataInputViewStream.java  |   71 -
 .../typeutils/runtime/DataOutputEncoder.java    |  190 --
 .../typeutils/runtime/DataOutputViewStream.java |   41 -
 .../typeutils/runtime/EitherSerializer.java     |  193 --
 .../runtime/GenericTypeComparator.java          |  177 --
 .../api/java/typeutils/runtime/KryoUtils.java   |   87 -
 .../java/typeutils/runtime/NoFetchingInput.java |  141 --
 .../java/typeutils/runtime/PojoComparator.java  |  354 ----
 .../java/typeutils/runtime/PojoSerializer.java  |  592 ------
 .../runtime/RuntimeComparatorFactory.java       |   75 -
 .../runtime/RuntimePairComparatorFactory.java   |   44 -
 .../runtime/RuntimeSerializerFactory.java       |  124 --
 .../typeutils/runtime/Tuple0Serializer.java     |  121 --
 .../java/typeutils/runtime/TupleComparator.java |  157 --
 .../typeutils/runtime/TupleComparatorBase.java  |  279 ---
 .../java/typeutils/runtime/TupleSerializer.java |  158 --
 .../typeutils/runtime/TupleSerializerBase.java  |  102 -
 .../java/typeutils/runtime/ValueComparator.java |  183 --
 .../java/typeutils/runtime/ValueSerializer.java |  152 --
 .../typeutils/runtime/WritableComparator.java   |  189 --
 .../typeutils/runtime/WritableSerializer.java   |  153 --
 .../typeutils/runtime/kryo/KryoSerializer.java  |  382 ----
 .../typeutils/runtime/kryo/Serializers.java     |  229 ---
 .../flink/api/java/TypeExtractionTest.java      |  117 ++
 .../api/java/operators/ExpressionKeysTest.java  |  479 -----
 .../flink/api/java/operators/NamesTest.java     |    4 +-
 .../operators/SelectorFunctionKeysTest.java     |  153 --
 .../flink/api/java/sca/UdfAnalyzerTest.java     |    2 +-
 .../apache/flink/api/java/tuple/Tuple2Test.java |   45 -
 .../type/extractor/PojoTypeExtractionTest.java  |  876 --------
 .../type/extractor/PojoTypeInformationTest.java |   98 -
 .../TypeExtractorInputFormatsTest.java          |  234 ---
 .../java/type/extractor/TypeExtractorTest.java  | 1931 ------------------
 .../api/java/typeutils/CompositeTypeTest.java   |  179 --
 .../api/java/typeutils/EitherTypeInfoTest.java  |   60 -
 .../api/java/typeutils/EnumTypeInfoTest.java    |   51 -
 .../api/java/typeutils/GenericTypeInfoTest.java |   47 -
 .../api/java/typeutils/MissingTypeInfoTest.java |   47 -
 .../java/typeutils/ObjectArrayTypeInfoTest.java |   58 -
 .../api/java/typeutils/PojoTypeInfoTest.java    |  153 --
 .../api/java/typeutils/TupleTypeInfoTest.java   |   96 -
 .../api/java/typeutils/TypeInfoParserTest.java  |  338 ---
 .../api/java/typeutils/ValueTypeInfoTest.java   |   87 -
 .../java/typeutils/WritableTypeInfoTest.java    |   74 -
 .../AbstractGenericArraySerializerTest.java     |  187 --
 .../AbstractGenericTypeComparatorTest.java      |  376 ----
 .../AbstractGenericTypeSerializerTest.java      |  364 ----
 .../runtime/AvroGenericArraySerializerTest.java |   28 -
 .../runtime/AvroGenericTypeComparatorTest.java  |   28 -
 .../runtime/AvroGenericTypeSerializerTest.java  |   29 -
 .../runtime/AvroSerializerEmptyArrayTest.java   |  189 --
 .../runtime/CopyableValueComparatorTest.java    |   53 -
 .../typeutils/runtime/EitherSerializerTest.java |  113 -
 .../runtime/GenericPairComparatorTest.java      |   89 -
 .../MultidimensionalArraySerializerTest.java    |  120 --
 .../typeutils/runtime/PojoComparatorTest.java   |   63 -
 .../typeutils/runtime/PojoContainingTuple.java  |   44 -
 .../runtime/PojoGenericTypeSerializerTest.java  |   33 -
 .../typeutils/runtime/PojoSerializerTest.java   |  243 ---
 .../runtime/PojoSubclassComparatorTest.java     |   76 -
 .../runtime/PojoSubclassSerializerTest.java     |  196 --
 .../typeutils/runtime/StringArrayWritable.java  |   83 -
 .../SubclassFromInterfaceSerializerTest.java    |  171 --
 .../runtime/TestDataOutputSerializer.java       |  308 ---
 .../runtime/TupleComparatorILD2Test.java        |   73 -
 .../runtime/TupleComparatorILD3Test.java        |   75 -
 .../runtime/TupleComparatorILDC3Test.java       |   75 -
 .../runtime/TupleComparatorILDX1Test.java       |   71 -
 .../runtime/TupleComparatorILDXC2Test.java      |   73 -
 .../runtime/TupleComparatorISD1Test.java        |   69 -
 .../runtime/TupleComparatorISD2Test.java        |   73 -
 .../runtime/TupleComparatorISD3Test.java        |   75 -
 .../runtime/TupleComparatorTTT1Test.java        |  139 --
 .../runtime/TupleComparatorTTT2Test.java        |  145 --
 .../runtime/TupleComparatorTTT3Test.java        |  154 --
 .../typeutils/runtime/TupleSerializerTest.java  |  238 ---
 .../runtime/TupleSerializerTestInstance.java    |   79 -
 .../typeutils/runtime/ValueComparatorTest.java  |   53 -
 .../runtime/ValueComparatorUUIDTest.java        |   46 -
 .../api/java/typeutils/runtime/ValueID.java     |   72 -
 .../runtime/ValueSerializerUUIDTest.java        |   50 -
 .../runtime/WritableComparatorTest.java         |   53 -
 .../runtime/WritableComparatorUUIDTest.java     |   46 -
 .../api/java/typeutils/runtime/WritableID.java  |   78 -
 .../runtime/WritableSerializerTest.java         |   50 -
 .../runtime/WritableSerializerUUIDTest.java     |   50 -
 .../runtime/kryo/KryoClearedBufferTest.java     |  287 ---
 .../kryo/KryoGenericArraySerializerTest.java    |   30 -
 .../kryo/KryoGenericTypeComparatorTest.java     |   30 -
 .../kryo/KryoGenericTypeSerializerTest.java     |  168 --
 .../kryo/KryoWithCustomSerializersTest.java     |   75 -
 .../typeutils/runtime/kryo/SerializersTest.java |  103 -
 .../tuple/base/TupleComparatorTestBase.java     |   43 -
 .../tuple/base/TuplePairComparatorTestBase.java |  109 -
 .../flink/python/api/PythonPlanBinder.java      |    2 +-
 .../api/java/table/JavaBatchTranslator.scala    |    5 +-
 .../scala/operators/ScalaAggregateOperator.java |    2 +-
 .../apache/flink/api/scala/CoGroupDataSet.scala |    4 +-
 .../org/apache/flink/api/scala/DataSet.scala    |   21 +-
 .../apache/flink/api/scala/GroupedDataSet.scala |    4 +-
 .../api/scala/UnfinishedCoGroupOperation.scala  |    2 +
 .../apache/flink/api/scala/joinDataSet.scala    |    3 +
 .../api/scala/typeutils/CaseClassTypeInfo.scala |    3 +-
 .../api/scala/unfinishedKeyPairOperation.scala  |    4 +-
 .../streaming/api/datastream/DataStream.java    |    2 +-
 .../streaming/util/keys/KeySelectorUtil.java    |    2 +-
 .../streaming/api/AggregationFunctionTest.java  |    2 +-
 .../scala/operators/CoGroupOperatorTest.scala   |    3 +-
 .../scala/operators/GroupCombineITCase.scala    |   10 +-
 .../api/scala/operators/JoinOperatorTest.scala  |    3 +-
 .../scala/types/TypeInformationGenTest.scala    |    4 +-
 281 files changed, 20419 insertions(+), 20528 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index adc9a9b..ba1050c 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -40,12 +40,6 @@ under the License.
                        <artifactId>flink-annotations</artifactId>
                        <version>${project.version}</version>
                </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>${shading-artifact.name}</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
                
                <dependency>
                        <groupId>commons-collections</groupId>
@@ -56,20 +50,61 @@ under the License.
                <dependency>
                        <groupId>com.esotericsoftware.kryo</groupId>
                        <artifactId>kryo</artifactId>
+                       <!-- managed version -->
                </dependency>
 
+               <!-- Avro is needed for the interoperability with Avro types 
for serialization -->
+               <dependency>
+                       <groupId>org.apache.avro</groupId>
+                       <artifactId>avro</artifactId>
+                       <!-- managed version -->
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.xerial.snappy</groupId>
+                                       <artifactId>snappy-java</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.commons</groupId>
+                                       
<artifactId>commons-compress</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- Hadoop is only needed here for serialization 
interoperability with the Writable type -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>${shading-artifact.name}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               
                <dependency>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                        <version>${guava.version}</version>
                </dependency>
 
+               <!-- test depedencies -->
+               
                <dependency>
                        <groupId>commons-io</groupId>
                        <artifactId>commons-io</artifactId>
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>joda-time</groupId>
+                       <artifactId>joda-time</artifactId>
+                       <version>2.5</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.joda</groupId>
+                       <artifactId>joda-convert</artifactId>
+                       <version>1.7</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
new file mode 100644
index 0000000..6d681de
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import com.google.common.base.Preconditions;
+
+public abstract class Keys<T> {
+
+       public abstract int getNumberOfKeyFields();
+
+       public abstract int[] computeLogicalKeyPositions();
+
+       protected abstract TypeInformation<?>[] getKeyFieldTypes();
+
+       public abstract <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo);
+
+       public boolean isEmpty() {
+               return getNumberOfKeyFields() == 0;
+       }
+
+       /**
+        * Check if two sets of keys are compatible to each other (matching 
types, key counts)
+        */
+       public boolean areCompatible(Keys<?> other) throws 
IncompatibleKeysException {
+
+               TypeInformation<?>[] thisKeyFieldTypes = 
this.getKeyFieldTypes();
+               TypeInformation<?>[] otherKeyFieldTypes = 
other.getKeyFieldTypes();
+
+               if (thisKeyFieldTypes.length != otherKeyFieldTypes.length) {
+                       throw new 
IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
+               } else {
+                       for (int i = 0; i < thisKeyFieldTypes.length; i++) {
+                               if 
(!thisKeyFieldTypes[i].equals(otherKeyFieldTypes[i])) {
+                                       throw new 
IncompatibleKeysException(thisKeyFieldTypes[i], otherKeyFieldTypes[i] );
+                               }
+                       }
+               }
+               return true;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Specializations for expression-based / extractor-based grouping
+       // 
--------------------------------------------------------------------------------------------
+       
+       
+       public static class SelectorFunctionKeys<T, K> extends Keys<T> {
+
+               private final KeySelector<T, K> keyExtractor;
+               private final TypeInformation<T> inputType;
+               private final TypeInformation<K> keyType;
+               private final List<FlatFieldDescriptor> keyFields;
+
+               public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, 
TypeInformation<T> inputType, TypeInformation<K> keyType) {
+
+                       if (keyExtractor == null) {
+                               throw new NullPointerException("Key extractor 
must not be null.");
+                       }
+                       if (keyType == null) {
+                               throw new NullPointerException("Key type must 
not be null.");
+                       }
+                       if (!keyType.isKeyType()) {
+                               throw new InvalidProgramException("Return type 
"+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key 
type");
+                       }
+
+                       this.keyExtractor = keyExtractor;
+                       this.inputType = inputType;
+                       this.keyType = keyType;
+
+                       if (keyType instanceof CompositeType) {
+                               this.keyFields = 
((CompositeType<T>)keyType).getFlatFields(ExpressionKeys.SELECT_ALL_CHAR);
+                       }
+                       else {
+                               this.keyFields = new ArrayList<>(1);
+                               this.keyFields.add(new FlatFieldDescriptor(0, 
keyType));
+                       }
+               }
+
+               public TypeInformation<K> getKeyType() {
+                       return keyType;
+               }
+
+               public TypeInformation<T> getInputType() {
+                       return inputType;
+               }
+
+               public KeySelector<T, K> getKeyExtractor() {
+                       return keyExtractor;
+               }
+
+               @Override
+               public int getNumberOfKeyFields() {
+                       return keyFields.size();
+               }
+
+               @Override
+               public int[] computeLogicalKeyPositions() {
+                       int[] logicalKeys = new int[keyFields.size()];
+                       for (int i = 0; i < keyFields.size(); i++) {
+                               logicalKeys[i] = keyFields.get(i).getPosition();
+                       }
+                       return logicalKeys;
+               }
+
+               @Override
+               protected TypeInformation<?>[] getKeyFieldTypes() {
+                       TypeInformation<?>[] fieldTypes = new 
TypeInformation[keyFields.size()];
+                       for (int i = 0; i < keyFields.size(); i++) {
+                               fieldTypes[i] = keyFields.get(i).getType();
+                       }
+                       return fieldTypes;
+               }
+               
+               @Override
+               public <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo) {
+
+                       if (keyFields.size() != 1) {
+                               throw new InvalidProgramException("Custom 
partitioners can only be used with keys that have one key field.");
+                       }
+                       
+                       if (typeInfo == null) {
+                               // try to extract key type from partitioner
+                               try {
+                                       typeInfo = 
TypeExtractor.getPartitionerTypes(partitioner);
+                               }
+                               catch (Throwable t) {
+                                       // best effort check, so we ignore 
exceptions
+                               }
+                       }
+
+                       // only check if type is known and not a generic type
+                       if (typeInfo != null && !(typeInfo instanceof 
GenericTypeInfo)) {
+                               // check equality of key and partitioner type
+                               if (!keyType.equals(typeInfo)) {
+                                       throw new InvalidProgramException("The 
partitioner is incompatible with the key type. "
+                                               + "Partitioner type: " + 
typeInfo + " , key type: " + keyType);
+                               }
+                       }
+               }
+
+               @Override
+               public String toString() {
+                       return "Key function (Type: " + keyType + ")";
+               }
+       }
+       
+       
+       /**
+        * Represents (nested) field access through string and integer-based 
keys
+        */
+       public static class ExpressionKeys<T> extends Keys<T> {
+               
+               public static final String SELECT_ALL_CHAR = "*";
+               public static final String SELECT_ALL_CHAR_SCALA = "_";
+               
+               // Flattened fields representing keys fields
+               private List<FlatFieldDescriptor> keyFields;
+
+               /**
+                * ExpressionKeys that is defined by the full data type.
+                */
+               public ExpressionKeys(TypeInformation<T> type) {
+                       this(SELECT_ALL_CHAR, type);
+               }
+
+               /**
+                * Create int-based (non-nested) field position keys on a tuple 
type.
+                */
+               public ExpressionKeys(int keyPosition, TypeInformation<T> type) 
{
+                       this(new int[]{keyPosition}, type, false);
+               }
+
+               /**
+                * Create int-based (non-nested) field position keys on a tuple 
type.
+                */
+               public ExpressionKeys(int[] keyPositions, TypeInformation<T> 
type) {
+                       this(keyPositions, type, false);
+               }
+
+               /**
+                * Create int-based (non-nested) field position keys on a tuple 
type.
+                */
+               public ExpressionKeys(int[] keyPositions, TypeInformation<T> 
type, boolean allowEmpty) {
+
+                       if (!type.isTupleType() || !(type instanceof 
CompositeType)) {
+                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid " +
+                                               "for tuple data types. Type: " 
+ type);
+                       }
+                       if (type.getArity() == 0) {
+                               throw new InvalidProgramException("Tuple size 
must be greater than 0. Size: " + type.getArity());
+                       }
+                       if (!allowEmpty && (keyPositions == null || 
keyPositions.length == 0)) {
+                               throw new IllegalArgumentException("The 
grouping fields must not be empty.");
+                       }
+
+                       this.keyFields = new ArrayList<>();
+
+                       if (keyPositions == null || keyPositions.length == 0) {
+                               // use all tuple fields as key fields
+                               keyPositions = 
createIncrIntArray(type.getArity());
+                       } else {
+                               rangeCheckFields(keyPositions, type.getArity() 
- 1);
+                       }
+                       Preconditions.checkArgument(keyPositions.length > 0, 
"Grouping fields can not be empty at this point");
+
+                       // extract key field types
+                       CompositeType<T> cType = (CompositeType<T>)type;
+                       this.keyFields = new ArrayList<>(type.getTotalFields());
+
+                       // for each key position, find all (nested) field types
+                       String[] fieldNames = cType.getFieldNames();
+                       ArrayList<FlatFieldDescriptor> tmpList = new 
ArrayList<>();
+                       for (int keyPos : keyPositions) {
+                               tmpList.clear();
+                               // get all flat fields
+                               cType.getFlatFields(fieldNames[keyPos], 0, 
tmpList);
+                               // check if fields are of key type
+                               for(FlatFieldDescriptor ffd : tmpList) {
+                                       if(!ffd.getType().isKeyType()) {
+                                               throw new 
InvalidProgramException("This type (" + ffd.getType() + ") cannot be used as 
key.");
+                                       }
+                               }
+                               this.keyFields.addAll(tmpList);
+                       }
+               }
+
+               /**
+                * Create String-based (nested) field expression keys on a 
composite type.
+                */
+               public ExpressionKeys(String keyExpression, TypeInformation<T> 
type) {
+                       this(new String[]{keyExpression}, type);
+               }
+
+               /**
+                * Create String-based (nested) field expression keys on a 
composite type.
+                */
+               public ExpressionKeys(String[] keyExpressions, 
TypeInformation<T> type) {
+                       Preconditions.checkNotNull(keyExpressions, "Field 
expression cannot be null.");
+
+                       this.keyFields = new ArrayList<>(keyExpressions.length);
+
+                       if (type instanceof CompositeType){
+                               CompositeType<T> cType = (CompositeType<T>) 
type;
+
+                               // extract the keys on their flat position
+                               for (String keyExpr : keyExpressions) {
+                                       if (keyExpr == null) {
+                                               throw new 
InvalidProgramException("Expression key may not be null.");
+                                       }
+                                       // strip off whitespace
+                                       keyExpr = keyExpr.trim();
+
+                                       List<FlatFieldDescriptor> flatFields = 
cType.getFlatFields(keyExpr);
+
+                                       if (flatFields.size() == 0) {
+                                               throw new 
InvalidProgramException("Unable to extract key from expression '" + keyExpr + 
"' on key " + cType);
+                                       }
+                                       // check if all nested fields can be 
used as keys
+                                       for (FlatFieldDescriptor field : 
flatFields) {
+                                               if 
(!field.getType().isKeyType()) {
+                                                       throw new 
InvalidProgramException("This type (" + field.getType() + ") cannot be used as 
key.");
+                                               }
+                                       }
+                                       // add flat fields to key fields
+                                       keyFields.addAll(flatFields);
+                               }
+                       }
+                       else {
+                               if (!type.isKeyType()) {
+                                       throw new InvalidProgramException("This 
type (" + type + ") cannot be used as key.");
+                               }
+
+                               // check that all key expressions are valid
+                               for (String keyExpr : keyExpressions) {
+                                       if (keyExpr == null) {
+                                               throw new 
InvalidProgramException("Expression key may not be null.");
+                                       }
+                                       // strip off whitespace
+                                       keyExpr = keyExpr.trim();
+                                       // check that full type is addressed
+                                       if (!(SELECT_ALL_CHAR.equals(keyExpr) 
|| SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
+                                               throw new 
InvalidProgramException(
+                                                       "Field expression must 
be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for 
non-composite types.");
+                                       }
+                                       // add full type as key
+                                       keyFields.add(new 
FlatFieldDescriptor(0, type));
+                               }
+                       }
+               }
+               
+               @Override
+               public int getNumberOfKeyFields() {
+                       if(keyFields == null) {
+                               return 0;
+                       }
+                       return keyFields.size();
+               }
+
+               @Override
+               public int[] computeLogicalKeyPositions() {
+                       int[] logicalKeys = new int[keyFields.size()];
+                       for (int i = 0; i < keyFields.size(); i++) {
+                               logicalKeys[i] = keyFields.get(i).getPosition();
+                       }
+                       return logicalKeys;
+               }
+
+               @Override
+               protected TypeInformation<?>[] getKeyFieldTypes() {
+                       TypeInformation<?>[] fieldTypes = new 
TypeInformation[keyFields.size()];
+                       for (int i = 0; i < keyFields.size(); i++) {
+                               fieldTypes[i] = keyFields.get(i).getType();
+                       }
+                       return fieldTypes;
+               }
+
+               @Override
+               public <E> void validateCustomPartitioner(Partitioner<E> 
partitioner, TypeInformation<E> typeInfo) {
+
+                       if (keyFields.size() != 1) {
+                               throw new InvalidProgramException("Custom 
partitioners can only be used with keys that have one key field.");
+                       }
+
+                       if (typeInfo == null) {
+                               // try to extract key type from partitioner
+                               try {
+                                       typeInfo = 
TypeExtractor.getPartitionerTypes(partitioner);
+                               }
+                               catch (Throwable t) {
+                                       // best effort check, so we ignore 
exceptions
+                               }
+                       }
+
+                       if (typeInfo != null && !(typeInfo instanceof 
GenericTypeInfo)) {
+                               // only check type compatibility if type is 
known and not a generic type
+
+                               TypeInformation<?> keyType = 
keyFields.get(0).getType();
+                               if (!keyType.equals(typeInfo)) {
+                                       throw new InvalidProgramException("The 
partitioner is incompatible with the key type. "
+                                                                               
+ "Partitioner type: " + typeInfo + " , key type: " + keyType);
+                               }
+                       }
+               }
+
+               @Override
+               public String toString() {
+                       Joiner join = Joiner.on('.');
+                       return "ExpressionKeys: " + join.join(keyFields);
+               }
+
+               public static boolean isSortKey(int fieldPos, 
TypeInformation<?> type) {
+
+                       if (!type.isTupleType() || !(type instanceof 
CompositeType)) {
+                               throw new InvalidProgramException("Specifying 
keys via field positions is only valid " +
+                                       "for tuple data types. Type: " + type);
+                       }
+                       if (type.getArity() == 0) {
+                               throw new InvalidProgramException("Tuple size 
must be greater than 0. Size: " + type.getArity());
+                       }
+
+                       if(fieldPos < 0 || fieldPos >= type.getArity()) {
+                               throw new IndexOutOfBoundsException("Tuple 
position is out of range: " + fieldPos);
+                       }
+
+                       TypeInformation<?> sortKeyType = 
((CompositeType<?>)type).getTypeAt(fieldPos);
+                       return sortKeyType.isSortKeyType();
+               }
+
+               public static boolean isSortKey(String fieldExpr, 
TypeInformation<?> type) {
+
+                       TypeInformation<?> sortKeyType;
+
+                       fieldExpr = fieldExpr.trim();
+                       if (SELECT_ALL_CHAR.equals(fieldExpr) || 
SELECT_ALL_CHAR_SCALA.equals(fieldExpr)) {
+                               sortKeyType = type;
+                       }
+                       else {
+                               if (type instanceof CompositeType) {
+                                       sortKeyType = ((CompositeType<?>) 
type).getTypeAt(fieldExpr);
+                               }
+                               else {
+                                       throw new InvalidProgramException(
+                                               "Field expression must be equal 
to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for atomic 
types.");
+                               }
+                       }
+
+                       return sortKeyType.isSortKeyType();
+               }
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+
+       private static int[] createIncrIntArray(int numKeys) {
+               int[] keyFields = new int[numKeys];
+               for (int i = 0; i < numKeys; i++) {
+                       keyFields[i] = i;
+               }
+               return keyFields;
+       }
+
+       private static void rangeCheckFields(int[] fields, int maxAllowedField) 
{
+
+               for (int f : fields) {
+                       if (f < 0 || f > maxAllowedField) {
+                               throw new IndexOutOfBoundsException("Tuple 
position is out of range: " + f);
+                       }
+               }
+       }
+
+       public static class IncompatibleKeysException extends Exception {
+               private static final long serialVersionUID = 1L;
+               public static final String SIZE_MISMATCH_MESSAGE = "The number 
of specified keys is different.";
+               
+               public IncompatibleKeysException(String message) {
+                       super(message);
+               }
+
+               public IncompatibleKeysException(TypeInformation<?> 
typeInformation, TypeInformation<?> typeInformation2) {
+                       super(typeInformation+" and "+typeInformation2+" are 
not compatible");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java 
b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
new file mode 100644
index 0000000..3d06c59
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * The {@link KeySelector} allows to use arbitrary objects for operations such 
as
+ * reduce, reduceGroup, join, coGoup, etc.
+ * 
+ * The extractor takes an object and returns the key for that object.
+ *
+ * @param <IN> Type of objects to extract the key from.
+ * @param <KEY> Type of key.
+ */
+public interface KeySelector<IN, KEY> extends Function, Serializable {
+       
+       /**
+        * User-defined function that extracts the key from an arbitrary object.
+        * 
+        * For example for a class:
+        * <pre>
+        *      public class Word {
+        *              String word;
+        *              int count;
+        *      }
+        * </pre>
+        * The key extractor could return the word as
+        * a key to group all Word objects by the String they contain.
+        * 
+        * The code would look like this
+        * <pre>
+        *      public String getKey(Word w) {
+        *              return w.word;
+        *      }
+        * </pre>
+        * 
+        * @param value The object to get the key from.
+        * @return The extracted key.
+        * 
+        * @throws Exception Throwing an exception will cause the execution of 
the respective task to fail,
+        *                   and trigger recovery or cancellation of the 
program. 
+        */
+       KEY getKey(IN value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..5f74513
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Special type information to generate a special AvroTypeInfo for Avro POJOs 
(implementing SpecificRecordBase, the typed Avro POJOs)
+ *
+ * Proceeding: It uses a regular pojo type analysis and replaces all {@code 
GenericType<CharSequence>}
+ *     with a {@code GenericType<avro.Utf8>}.
+ * All other types used by Avro are standard Java types.
+ * Only strings are represented as CharSequence fields and represented as Utf8 
classes at runtime.
+ * CharSequence is not comparable. To make them nicely usable with field 
expressions, we replace them here
+ * by generic type infos containing Utf8 classes (which are comparable),
+ *
+ * This class is checked by the AvroPojoTest.
+ * @param <T>
+ */
+public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
+       public AvroTypeInfo(Class<T> typeClass) {
+               super(typeClass, generateFieldsFromAvroSchema(typeClass));
+       }
+
+       private static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
+               PojoTypeExtractor pte = new PojoTypeExtractor();
+               TypeInformation ti = pte.analyzePojo(typeClass, new 
ArrayList<Type>(), null, null, null);
+
+               if(!(ti instanceof PojoTypeInfo)) {
+                       throw new IllegalStateException("Expecting type to be a 
PojoTypeInfo");
+               }
+               PojoTypeInfo pti =  (PojoTypeInfo) ti;
+               List<PojoField> newFields = new 
ArrayList<PojoField>(pti.getTotalFields());
+
+               for(int i = 0; i < pti.getArity(); i++) {
+                       PojoField f = pti.getPojoFieldAt(i);
+                       TypeInformation newType = f.getTypeInformation();
+                       // check if type is a CharSequence
+                       if(newType instanceof GenericTypeInfo) {
+                               
if((newType).getTypeClass().equals(CharSequence.class)) {
+                                       // replace the type by a 
org.apache.avro.util.Utf8
+                                       newType = new 
GenericTypeInfo(org.apache.avro.util.Utf8.class);
+                               }
+                       }
+                       PojoField newField = new PojoField(f.getField(), 
newType);
+                       newFields.add(newField);
+               }
+               return newFields;
+       }
+
+       private static class PojoTypeExtractor extends TypeExtractor {
+               private PojoTypeExtractor() {
+                       super();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
new file mode 100644
index 0000000..74d850b
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.types.Either;
+
+/**
+ * A {@link TypeInformation} for the {@link Either} type of the Java API.
+ *
+ * @param <L> the Left value type
+ * @param <R> the Right value type
+ */
+public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final TypeInformation<L> leftType;
+
+       private final TypeInformation<R> rightType;
+
+       public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> 
rightType) {
+               this.leftType = leftType;
+               this.rightType = rightType;
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public Class<Either<L, R>> getTypeClass() {
+               return (Class<Either<L, R>>) (Class<?>) Either.class;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig 
config) {
+               return new EitherSerializer<L, 
R>(leftType.createSerializer(config),
+                               rightType.createSerializer(config));
+       }
+
+       @Override
+       public String toString() {
+               return "Either <" + leftType.toString() + ", " + 
rightType.toString() + ">";
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof EitherTypeInfo) {
+                       EitherTypeInfo<L, R> other = (EitherTypeInfo<L, R>) obj;
+
+                       return other.canEqual(this) &&
+                               leftType.equals(other.leftType) &&
+                               rightType.equals(other.rightType);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return 17 * leftType.hashCode() + rightType.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof EitherTypeInfo;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public TypeInformation<L> getLeftType() {
+               return leftType;
+       }
+
+       public TypeInformation<R> getRightType() {
+               return rightType;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
new file mode 100644
index 0000000..de59c36
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.EnumComparator;
+import org.apache.flink.api.common.typeutils.base.EnumSerializer;
+
+/**
+ * A {@link TypeInformation} for java enumeration types. 
+ *
+ * @param <T> The type represented by this type information.
+ */
+public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> 
implements AtomicType<T> {
+
+       private static final long serialVersionUID = 8936740290137178660L;
+       
+       private final Class<T> typeClass;
+
+       public EnumTypeInfo(Class<T> typeClass) {
+               Preconditions.checkNotNull(typeClass, "Enum type class must not 
be null.");
+
+               if (!Enum.class.isAssignableFrom(typeClass) ) {
+                       throw new IllegalArgumentException("EnumTypeInfo can 
only be used for subclasses of " + Enum.class.getName());
+               }
+
+               this.typeClass = typeClass;
+       }
+
+       @Override
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
+               return new EnumComparator<T>(sortOrderAscending);
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+       
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @Override
+       public Class<T> getTypeClass() {
+               return this.typeClass;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return true;
+       }
+
+       @Override
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
+               return new EnumSerializer<T>(typeClass);
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Standard utils
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "EnumTypeInfo<" + typeClass.getName() + ">";
+       }       
+       
+       @Override
+       public int hashCode() {
+               return typeClass.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof EnumTypeInfo;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof EnumTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj;
+
+                       return enumTypeInfo.canEqual(this) &&
+                               typeClass == enumTypeInfo.typeClass;
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
new file mode 100644
index 0000000..7e7aa68
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+
+public class GenericTypeInfo<T> extends TypeInformation<T> implements 
AtomicType<T> {
+
+       private static final long serialVersionUID = -7959114120287706504L;
+       
+       private final Class<T> typeClass;
+
+       public GenericTypeInfo(Class<T> typeClass) {
+               this.typeClass = Preconditions.checkNotNull(typeClass);
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+       
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @Override
+       public Class<T> getTypeClass() {
+               return typeClass;
+       }
+       
+       @Override
+       public boolean isKeyType() {
+               return Comparable.class.isAssignableFrom(typeClass);
+       }
+
+       @Override
+       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+               return new KryoSerializer<T>(this.typeClass, config);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
+               if (isKeyType()) {
+                       @SuppressWarnings("rawtypes")
+                       GenericTypeComparator comparator = new 
GenericTypeComparator(sortOrderAscending, createSerializer(executionConfig), 
this.typeClass);
+                       return (TypeComparator<T>) comparator;
+               }
+
+               throw new UnsupportedOperationException("Types that do not 
implement java.lang.Comparable cannot be used as keys.");
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return typeClass.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof GenericTypeInfo;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof GenericTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       GenericTypeInfo<T> genericTypeInfo = 
(GenericTypeInfo<T>) obj;
+
+                       return typeClass == genericTypeInfo.typeClass;
+               } else {
+                       return false;
+               }
+       }
+       
+       @Override
+       public String toString() {
+               return "GenericType<" + typeClass.getCanonicalName() + ">";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
new file mode 100644
index 0000000..f8b4247
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this 
interface to be configured
+ * with the data type they will operate on. The method {@link 
#setInputType(org.apache.flink.api
+ * .common.typeinfo.TypeInformation, 
org.apache.flink.api.common.ExecutionConfig)} will be
+ * called when the output format is used with an output method such as
+ * {@link 
org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
+ */
+public interface InputTypeConfigurable {
+
+       /**
+        * Method that is called on an {@link 
org.apache.flink.api.common.io.OutputFormat} when it is passed to
+        * the DataSet's output method. May be used to configures the output 
format based on the data type.
+        *
+        * @param type The data type of the input.
+        * @param executionConfig
+        */
+       void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
new file mode 100644
index 0000000..1dd7f01
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A special type information signifying that the type extraction failed. It 
contains
+ * additional error information.
+ */
+public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
+
+       private static final long serialVersionUID = -4212082837126702723L;
+       
+       private final String functionName;
+       private final InvalidTypesException typeException;
+
+       
+       public MissingTypeInfo(String functionName) {
+               this(functionName, new InvalidTypesException("An unknown error 
occured."));
+       }
+
+       public MissingTypeInfo(String functionName, InvalidTypesException 
typeException) {
+               this.functionName = functionName;
+               this.typeException = typeException;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public String getFunctionName() {
+               return functionName;
+       }
+
+       public InvalidTypesException getTypeException() {
+               return typeException;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public boolean isBasicType() {
+               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
+       }
+
+       @Override
+       public boolean isTupleType() {
+               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
+       }
+
+       @Override
+       public int getArity() {
+               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
+       }
+
+       @Override
+       public Class<InvalidTypesException> getTypeClass() {
+               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
+       }
+
+       @Override
+       public boolean isKeyType() {
+               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
+       }
+
+       @Override
+       public TypeSerializer<InvalidTypesException> 
createSerializer(ExecutionConfig executionConfig) {
+               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
+       }
+
+       @Override
+       public String toString() {
+               return getClass().getSimpleName() + "<" + functionName + ", " + 
typeException.getMessage() + ">";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof MissingTypeInfo) {
+                       MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj;
+
+                       return missingTypeInfo.canEqual(this) &&
+                               
functionName.equals(missingTypeInfo.functionName) &&
+                               
typeException.equals(missingTypeInfo.typeException);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * functionName.hashCode() + typeException.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof MissingTypeInfo;
+       }
+
+       @Override
+       public int getTotalFields() {
+               throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
new file mode 100644
index 0000000..150c976
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Array;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+
+public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
+
+       private static final long serialVersionUID = 1L;
+       
+       private final Class<T> arrayType;
+       private final TypeInformation<C> componentInfo;
+
+       private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> 
componentInfo) {
+               this.arrayType = Preconditions.checkNotNull(arrayType);
+               this.componentInfo = Preconditions.checkNotNull(componentInfo);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public Class<T> getTypeClass() {
+               return arrayType;
+       }
+
+       public TypeInformation<C> getComponentInfo() {
+               return componentInfo;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return false;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
+               return (TypeSerializer<T>) new GenericArraySerializer<C>(
+                       componentInfo.getTypeClass(),
+                       componentInfo.createSerializer(executionConfig));
+       }
+
+       @Override
+       public String toString() {
+               return this.getClass().getSimpleName() + "<" + 
this.componentInfo + ">";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ObjectArrayTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = 
(ObjectArrayTypeInfo<T, C>)obj;
+
+                       return objectArrayTypeInfo.canEqual(this) &&
+                               arrayType == objectArrayTypeInfo.arrayType &&
+                               
componentInfo.equals(objectArrayTypeInfo.componentInfo);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof ObjectArrayTypeInfo;
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * this.arrayType.hashCode() + 
this.componentInfo.hashCode();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> 
arrayClass, TypeInformation<C> componentInfo) {
+               Preconditions.checkNotNull(arrayClass);
+               Preconditions.checkNotNull(componentInfo);
+               Preconditions.checkArgument(arrayClass.isArray(), "Class " + 
arrayClass + " must be an array.");
+
+               return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
+       }
+
+       /**
+        * Creates a new {@link 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo} from a
+        * {@link TypeInformation} for the component type.
+        *
+        * <p>
+        * This must be used in cases where the complete type of the array is 
not available as a
+        * {@link java.lang.reflect.Type} or {@link java.lang.Class}.
+        */
+       @SuppressWarnings("unchecked")
+       public static <T, C> ObjectArrayTypeInfo<T, C> 
getInfoFor(TypeInformation<C> componentInfo) {
+               Preconditions.checkNotNull(componentInfo);
+
+               return new ObjectArrayTypeInfo<T, C>(
+                       
(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
+                       componentInfo);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
new file mode 100644
index 0000000..1b008c0
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Represent a field definition for {@link PojoTypeInfo} type of objects.
+ */
+public class PojoField implements Serializable {
+
+       private static final long serialVersionUID = 1975295846436559363L;
+
+       private transient Field field;
+       private final TypeInformation<?> type;
+
+       public PojoField(Field field, TypeInformation<?> type) {
+               this.field = Preconditions.checkNotNull(field);
+               this.type = Preconditions.checkNotNull(type);
+       }
+
+       public Field getField() {
+               return field;
+       }
+
+       public TypeInformation<?> getTypeInformation() {
+               return type;
+       }
+
+       private void writeObject(ObjectOutputStream out)
+                       throws IOException, ClassNotFoundException {
+               out.defaultWriteObject();
+               out.writeObject(field.getDeclaringClass());
+               out.writeUTF(field.getName());
+       }
+
+       private void readObject(ObjectInputStream in)
+                       throws IOException, ClassNotFoundException {
+               in.defaultReadObject();
+               Class<?> clazz = (Class<?>)in.readObject();
+               String fieldName = in.readUTF();
+               field = null;
+               // try superclasses as well
+               while (clazz != null) {
+                       try {
+                               field = clazz.getDeclaredField(fieldName);
+                               field.setAccessible(true);
+                               break;
+                       } catch (NoSuchFieldException e) {
+                               clazz = clazz.getSuperclass();
+                       }
+               }
+               if (field == null) {
+                       throw new RuntimeException("Class resolved at 
TaskManager is not compatible with class read during Plan setup."
+                                       + " (" + fieldName + ")");
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "PojoField " + field.getDeclaringClass() + "." + 
field.getName() + " (" + type + ")";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof PojoField) {
+                       PojoField other = (PojoField) obj;
+
+                       return other.canEqual(this) && type.equals(other.type) 
&&
+                               Objects.equals(field, other.field);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(field, type);
+       }
+
+       public boolean canEqual(Object obj) {
+               return obj instanceof PojoField;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
new file mode 100644
index 0000000..cc0d239
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
+
+import com.google.common.base.Joiner;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+/**
+ * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
+ * since the conditions are slightly different from Java Beans.
+ * A type is considered a FLink POJO type, if it fulfills the conditions below.
+ * <ul>
+ *   <li>It is a public class, and standalone (not a non-static inner 
class)</li>
+ *   <li>It has a public no-argument constructor.</li>
+ *   <li>All fields are either public, or have public getters and setters.</li>
+ * </ul>
+ * 
+ * @param <T> The type represented by this type information.
+ */
+public class PojoTypeInfo<T> extends CompositeType<T> {
+       
+       private static final long serialVersionUID = 1L;
+
+       private final static String REGEX_FIELD = 
"[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
+       private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
+       private final static String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS
+                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+       private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
+       private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+
+       private final PojoField[] fields;
+       
+       private final int totalFields;
+
+       public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
+               super(typeClass);
+
+               
Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
+                               "POJO " + typeClass + " is not public");
+
+               this.fields = fields.toArray(new PojoField[fields.size()]);
+
+               Arrays.sort(this.fields, new Comparator<PojoField>() {
+                       @Override
+                       public int compare(PojoField o1, PojoField o2) {
+                               return 
o1.getField().getName().compareTo(o2.getField().getName());
+                       }
+               });
+
+               int counterFields = 0;
+
+               for(PojoField field : fields) {
+                       counterFields += 
field.getTypeInformation().getTotalFields();
+               }
+
+               totalFields = counterFields;
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return fields.length;
+       }
+       
+       @Override
+       public int getTotalFields() {
+               return totalFields;
+       }
+
+       @Override
+       public boolean isSortKeyType() {
+               // Support for sorting POJOs that implement Comparable is not 
implemented yet.
+               // Since the order of fields in a POJO type is not well 
defined, sorting on fields
+               //   gives only some undefined order.
+               return false;
+       }
+       
+
+       @Override
+       public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
+
+               Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+               if(!matcher.matches()) {
+                       throw new InvalidFieldReferenceException("Invalid POJO 
field reference \""+fieldExpression+"\".");
+               }
+
+               String field = matcher.group(0);
+               if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+                       // handle select all
+                       int keyPosition = 0;
+                       for(PojoField pField : fields) {
+                               if(pField.getTypeInformation() instanceof 
CompositeType) {
+                                       CompositeType<?> cType = 
(CompositeType<?>)pField.getTypeInformation();
+                                       
cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
+                                       keyPosition += cType.getTotalFields()-1;
+                               } else {
+                                       result.add(
+                                               new NamedFlatFieldDescriptor(
+                                                       
pField.getField().getName(),
+                                                       offset + keyPosition,
+                                                       
pField.getTypeInformation()));
+                               }
+                               keyPosition++;
+                       }
+                       return;
+               } else {
+                       field = matcher.group(1);
+               }
+
+               // get field
+               int fieldPos = -1;
+               TypeInformation<?> fieldType = null;
+               for (int i = 0; i < fields.length; i++) {
+                       if (fields[i].getField().getName().equals(field)) {
+                               fieldPos = i;
+                               fieldType = fields[i].getTypeInformation();
+                               break;
+                       }
+               }
+               if (fieldPos == -1) {
+                       throw new InvalidFieldReferenceException("Unable to 
find field \""+field+"\" in type "+this+".");
+               }
+               String tail = matcher.group(3);
+               if(tail == null) {
+                       if(fieldType instanceof CompositeType) {
+                               // forward offset
+                               for(int i=0; i<fieldPos; i++) {
+                                       offset += 
this.getTypeAt(i).getTotalFields();
+                               }
+                               // add all fields of composite type
+                               ((CompositeType<?>) 
fieldType).getFlatFields("*", offset, result);
+                       } else {
+                               // we found the field to add
+                               // compute flat field position by adding 
skipped fields
+                               int flatFieldPos = offset;
+                               for(int i=0; i<fieldPos; i++) {
+                                       flatFieldPos += 
this.getTypeAt(i).getTotalFields();
+                               }
+                               result.add(new 
FlatFieldDescriptor(flatFieldPos, fieldType));
+                       }
+               } else {
+                       if(fieldType instanceof CompositeType<?>) {
+                               // forward offset
+                               for(int i=0; i<fieldPos; i++) {
+                                       offset += 
this.getTypeAt(i).getTotalFields();
+                               }
+                               ((CompositeType<?>) 
fieldType).getFlatFields(tail, offset, result);
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
+                       }
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+
+               Matcher matcher = 
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+               if(!matcher.matches()) {
+                       if 
(fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+                               throw new 
InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Invalid format of POJO field expression 
\""+fieldExpression+"\".");
+                       }
+               }
+
+               String field = matcher.group(1);
+               // get field
+               int fieldPos = -1;
+               TypeInformation<?> fieldType = null;
+               for (int i = 0; i < fields.length; i++) {
+                       if (fields[i].getField().getName().equals(field)) {
+                               fieldPos = i;
+                               fieldType = fields[i].getTypeInformation();
+                               break;
+                       }
+               }
+               if (fieldPos == -1) {
+                       throw new InvalidFieldReferenceException("Unable to 
find field \""+field+"\" in type "+this+".");
+               }
+
+               String tail = matcher.group(3);
+               if(tail == null) {
+                       // we found the type
+                       return (TypeInformation<X>) fieldType;
+               } else {
+                       if(fieldType instanceof CompositeType<?>) {
+                               return ((CompositeType<?>) 
fieldType).getTypeAt(tail);
+                       } else {
+                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
+                       }
+               }
+       }
+
+       @Override
+       public <X> TypeInformation<X> getTypeAt(int pos) {
+               if (pos < 0 || pos >= this.fields.length) {
+                       throw new IndexOutOfBoundsException();
+               }
+               @SuppressWarnings("unchecked")
+               TypeInformation<X> typed = (TypeInformation<X>) 
fields[pos].getTypeInformation();
+               return typed;
+       }
+
+       @Override
+       protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
+               return new PojoTypeComparatorBuilder();
+       }
+
+       // used for testing. Maybe use mockito here
+       public PojoField getPojoFieldAt(int pos) {
+               if (pos < 0 || pos >= this.fields.length) {
+                       throw new IndexOutOfBoundsException();
+               }
+               return this.fields[pos];
+       }
+
+       public String[] getFieldNames() {
+               String[] result = new String[fields.length];
+               for (int i = 0; i < fields.length; i++) {
+                       result[i] = fields[i].getField().getName();
+               }
+               return result;
+       }
+
+       @Override
+       public int getFieldIndex(String fieldName) {
+               for (int i = 0; i < fields.length; i++) {
+                       if (fields[i].getField().getName().equals(fieldName)) {
+                               return i;
+                       }
+               }
+               return -1;
+       }
+
+       @Override
+       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+               if(config.isForceKryoEnabled()) {
+                       return new KryoSerializer<T>(getTypeClass(), config);
+               }
+               if(config.isForceAvroEnabled()) {
+                       return new AvroSerializer<T>(getTypeClass());
+               }
+
+               TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length ];
+               Field[] reflectiveFields = new Field[fields.length];
+
+               for (int i = 0; i < fields.length; i++) {
+                       fieldSerializers[i] = 
fields[i].getTypeInformation().createSerializer(config);
+                       reflectiveFields[i] = fields[i].getField();
+               }
+
+               return new PojoSerializer<T>(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof PojoTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj;
+
+                       return pojoTypeInfo.canEqual(this) &&
+                               super.equals(pojoTypeInfo) &&
+                               Arrays.equals(fields, pojoTypeInfo.fields) &&
+                               totalFields == pojoTypeInfo.totalFields;
+               } else {
+                       return false;
+               }
+       }
+       
+       @Override
+       public int hashCode() {
+               return 31 * (31 * Arrays.hashCode(fields) + totalFields) + 
super.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof PojoTypeInfo;
+       }
+       
+       @Override
+       public String toString() {
+               List<String> fieldStrings = new ArrayList<String>();
+               for (PojoField field : fields) {
+                       fieldStrings.add(field.getField().getName() + ": " + 
field.getTypeInformation().toString());
+               }
+               return "PojoType<" + getTypeClass().getName()
+                               + ", fields = [" + Joiner.on(", 
").join(fieldStrings) + "]"
+                               + ">";
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private class PojoTypeComparatorBuilder implements 
TypeComparatorBuilder<T> {
+
+               private ArrayList<TypeComparator> fieldComparators;
+               private ArrayList<Field> keyFields;
+
+               public PojoTypeComparatorBuilder() {
+                       fieldComparators = new ArrayList<TypeComparator>();
+                       keyFields = new ArrayList<Field>();
+               }
+
+
+               @Override
+               public void initializeTypeComparatorBuilder(int size) {
+                       fieldComparators.ensureCapacity(size);
+                       keyFields.ensureCapacity(size);
+               }
+
+               @Override
+               public void addComparatorField(int fieldId, TypeComparator<?> 
comparator) {
+                       fieldComparators.add(comparator);
+                       keyFields.add(fields[fieldId].getField());
+               }
+
+               @Override
+               public TypeComparator<T> createTypeComparator(ExecutionConfig 
config) {
+                       Preconditions.checkState(
+                               keyFields.size() > 0,
+                               "No keys were defined for the 
PojoTypeComparatorBuilder.");
+
+                       Preconditions.checkState(
+                               fieldComparators.size() > 0,
+                               "No type comparators were defined for the 
PojoTypeComparatorBuilder.");
+
+                       Preconditions.checkState(
+                               keyFields.size() == fieldComparators.size(),
+                               "Number of key fields and field comparators is 
not equal.");
+
+                       return new PojoComparator<T>(
+                               keyFields.toArray(new Field[keyFields.size()]),
+                               fieldComparators.toArray(new 
TypeComparator[fieldComparators.size()]),
+                               createSerializer(config),
+                               getTypeClass());
+               }
+       }
+
+       public static class NamedFlatFieldDescriptor extends 
FlatFieldDescriptor {
+
+               private String fieldName;
+
+               public NamedFlatFieldDescriptor(String name, int keyPosition, 
TypeInformation<?> type) {
+                       super(keyPosition, type);
+                       this.fieldName = name;
+               }
+
+               public String getFieldName() {
+                       return fieldName;
+               }
+
+               @Override
+               public String toString() {
+                       return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
new file mode 100644
index 0000000..5e0cbed
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * This interface can be implemented by functions and input formats to tell 
the framework
+ * about their produced data type. This method acts as an alternative to the 
reflection analysis
+ * that is otherwise performed and is useful in situations where the produced 
data type may vary
+ * depending on parametrization.
+ */
+public interface ResultTypeQueryable<T> {
+
+       /**
+        * Gets the data type (as a {@link TypeInformation}) produced by this 
function or input format.
+        * 
+        * @return The data type produced by this function or input format.
+        */
+       TypeInformation<T> getProducedType();
+}

Reply via email to