http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
deleted file mode 100644
index f7e4e42..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-
-import com.google.common.base.Joiner;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-
-/**
- * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
- * since the conditions are slightly different from Java Beans.
- * A type is considered a FLink POJO type, if it fulfills the conditions below.
- * <ul>
- *   <li>It is a public class, and standalone (not a non-static inner 
class)</li>
- *   <li>It has a public no-argument constructor.</li>
- *   <li>All fields are either public, or have public getters and setters.</li>
- * </ul>
- * 
- * @param <T> The type represented by this type information.
- */
-public class PojoTypeInfo<T> extends CompositeType<T> {
-       
-       private static final long serialVersionUID = 1L;
-
-       private final static String REGEX_FIELD = 
"[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
-       private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
-       private final static String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS
-                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR
-                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
-
-       private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
-       private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
-
-       private final PojoField[] fields;
-       
-       private final int totalFields;
-
-       public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
-               super(typeClass);
-
-               
Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
-                               "POJO " + typeClass + " is not public");
-
-               this.fields = fields.toArray(new PojoField[fields.size()]);
-
-               Arrays.sort(this.fields, new Comparator<PojoField>() {
-                       @Override
-                       public int compare(PojoField o1, PojoField o2) {
-                               return 
o1.getField().getName().compareTo(o2.getField().getName());
-                       }
-               });
-
-               int counterFields = 0;
-
-               for(PojoField field : fields) {
-                       counterFields += 
field.getTypeInformation().getTotalFields();
-               }
-
-               totalFields = counterFields;
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return fields.length;
-       }
-       
-       @Override
-       public int getTotalFields() {
-               return totalFields;
-       }
-
-       @Override
-       public boolean isSortKeyType() {
-               // Support for sorting POJOs that implement Comparable is not 
implemented yet.
-               // Since the order of fields in a POJO type is not well 
defined, sorting on fields
-               //   gives only some undefined order.
-               return false;
-       }
-       
-
-       @Override
-       public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
-
-               Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
-               if(!matcher.matches()) {
-                       throw new InvalidFieldReferenceException("Invalid POJO 
field reference \""+fieldExpression+"\".");
-               }
-
-               String field = matcher.group(0);
-               if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
-                       // handle select all
-                       int keyPosition = 0;
-                       for(PojoField pField : fields) {
-                               if(pField.getTypeInformation() instanceof 
CompositeType) {
-                                       CompositeType<?> cType = 
(CompositeType<?>)pField.getTypeInformation();
-                                       
cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
-                                       keyPosition += cType.getTotalFields()-1;
-                               } else {
-                                       result.add(
-                                               new NamedFlatFieldDescriptor(
-                                                       
pField.getField().getName(),
-                                                       offset + keyPosition,
-                                                       
pField.getTypeInformation()));
-                               }
-                               keyPosition++;
-                       }
-                       return;
-               } else {
-                       field = matcher.group(1);
-               }
-
-               // get field
-               int fieldPos = -1;
-               TypeInformation<?> fieldType = null;
-               for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].getField().getName().equals(field)) {
-                               fieldPos = i;
-                               fieldType = fields[i].getTypeInformation();
-                               break;
-                       }
-               }
-               if (fieldPos == -1) {
-                       throw new InvalidFieldReferenceException("Unable to 
find field \""+field+"\" in type "+this+".");
-               }
-               String tail = matcher.group(3);
-               if(tail == null) {
-                       if(fieldType instanceof CompositeType) {
-                               // forward offset
-                               for(int i=0; i<fieldPos; i++) {
-                                       offset += 
this.getTypeAt(i).getTotalFields();
-                               }
-                               // add all fields of composite type
-                               ((CompositeType<?>) 
fieldType).getFlatFields("*", offset, result);
-                       } else {
-                               // we found the field to add
-                               // compute flat field position by adding 
skipped fields
-                               int flatFieldPos = offset;
-                               for(int i=0; i<fieldPos; i++) {
-                                       flatFieldPos += 
this.getTypeAt(i).getTotalFields();
-                               }
-                               result.add(new 
FlatFieldDescriptor(flatFieldPos, fieldType));
-                       }
-               } else {
-                       if(fieldType instanceof CompositeType<?>) {
-                               // forward offset
-                               for(int i=0; i<fieldPos; i++) {
-                                       offset += 
this.getTypeAt(i).getTotalFields();
-                               }
-                               ((CompositeType<?>) 
fieldType).getFlatFields(tail, offset, result);
-                       } else {
-                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
-                       }
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
-
-               Matcher matcher = 
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
-               if(!matcher.matches()) {
-                       if 
(fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
-                               throw new 
InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
-                       } else {
-                               throw new 
InvalidFieldReferenceException("Invalid format of POJO field expression 
\""+fieldExpression+"\".");
-                       }
-               }
-
-               String field = matcher.group(1);
-               // get field
-               int fieldPos = -1;
-               TypeInformation<?> fieldType = null;
-               for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].getField().getName().equals(field)) {
-                               fieldPos = i;
-                               fieldType = fields[i].getTypeInformation();
-                               break;
-                       }
-               }
-               if (fieldPos == -1) {
-                       throw new InvalidFieldReferenceException("Unable to 
find field \""+field+"\" in type "+this+".");
-               }
-
-               String tail = matcher.group(3);
-               if(tail == null) {
-                       // we found the type
-                       return (TypeInformation<X>) fieldType;
-               } else {
-                       if(fieldType instanceof CompositeType<?>) {
-                               return ((CompositeType<?>) 
fieldType).getTypeAt(tail);
-                       } else {
-                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
-                       }
-               }
-       }
-
-       @Override
-       public <X> TypeInformation<X> getTypeAt(int pos) {
-               if (pos < 0 || pos >= this.fields.length) {
-                       throw new IndexOutOfBoundsException();
-               }
-               @SuppressWarnings("unchecked")
-               TypeInformation<X> typed = (TypeInformation<X>) 
fields[pos].getTypeInformation();
-               return typed;
-       }
-
-       @Override
-       protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
-               return new PojoTypeComparatorBuilder();
-       }
-
-       // used for testing. Maybe use mockito here
-       public PojoField getPojoFieldAt(int pos) {
-               if (pos < 0 || pos >= this.fields.length) {
-                       throw new IndexOutOfBoundsException();
-               }
-               return this.fields[pos];
-       }
-
-       public String[] getFieldNames() {
-               String[] result = new String[fields.length];
-               for (int i = 0; i < fields.length; i++) {
-                       result[i] = fields[i].getField().getName();
-               }
-               return result;
-       }
-
-       @Override
-       public int getFieldIndex(String fieldName) {
-               for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].getField().getName().equals(fieldName)) {
-                               return i;
-                       }
-               }
-               return -1;
-       }
-
-       @Override
-       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-               if(config.isForceKryoEnabled()) {
-                       return new KryoSerializer<T>(getTypeClass(), config);
-               }
-               if(config.isForceAvroEnabled()) {
-                       return new AvroSerializer<T>(getTypeClass());
-               }
-
-               TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length ];
-               Field[] reflectiveFields = new Field[fields.length];
-
-               for (int i = 0; i < fields.length; i++) {
-                       fieldSerializers[i] = 
fields[i].getTypeInformation().createSerializer(config);
-                       reflectiveFields[i] = fields[i].getField();
-               }
-
-               return new PojoSerializer<T>(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof PojoTypeInfo) {
-                       @SuppressWarnings("unchecked")
-                       PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj;
-
-                       return pojoTypeInfo.canEqual(this) &&
-                               super.equals(pojoTypeInfo) &&
-                               Arrays.equals(fields, pojoTypeInfo.fields) &&
-                               totalFields == pojoTypeInfo.totalFields;
-               } else {
-                       return false;
-               }
-       }
-       
-       @Override
-       public int hashCode() {
-               return 31 * (31 * Arrays.hashCode(fields) + totalFields) + 
super.hashCode();
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof PojoTypeInfo;
-       }
-       
-       @Override
-       public String toString() {
-               List<String> fieldStrings = new ArrayList<String>();
-               for (PojoField field : fields) {
-                       fieldStrings.add(field.getField().getName() + ": " + 
field.getTypeInformation().toString());
-               }
-               return "PojoType<" + getTypeClass().getName()
-                               + ", fields = [" + Joiner.on(", 
").join(fieldStrings) + "]"
-                               + ">";
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private class PojoTypeComparatorBuilder implements 
TypeComparatorBuilder<T> {
-
-               private ArrayList<TypeComparator> fieldComparators;
-               private ArrayList<Field> keyFields;
-
-               public PojoTypeComparatorBuilder() {
-                       fieldComparators = new ArrayList<TypeComparator>();
-                       keyFields = new ArrayList<Field>();
-               }
-
-
-               @Override
-               public void initializeTypeComparatorBuilder(int size) {
-                       fieldComparators.ensureCapacity(size);
-                       keyFields.ensureCapacity(size);
-               }
-
-               @Override
-               public void addComparatorField(int fieldId, TypeComparator<?> 
comparator) {
-                       fieldComparators.add(comparator);
-                       keyFields.add(fields[fieldId].getField());
-               }
-
-               @Override
-               public TypeComparator<T> createTypeComparator(ExecutionConfig 
config) {
-                       Preconditions.checkState(
-                               keyFields.size() > 0,
-                               "No keys were defined for the 
PojoTypeComparatorBuilder.");
-
-                       Preconditions.checkState(
-                               fieldComparators.size() > 0,
-                               "No type comparators were defined for the 
PojoTypeComparatorBuilder.");
-
-                       Preconditions.checkState(
-                               keyFields.size() == fieldComparators.size(),
-                               "Number of key fields and field comparators is 
not equal.");
-
-                       return new PojoComparator<T>(
-                               keyFields.toArray(new Field[keyFields.size()]),
-                               fieldComparators.toArray(new 
TypeComparator[fieldComparators.size()]),
-                               createSerializer(config),
-                               getTypeClass());
-               }
-       }
-
-       public static class NamedFlatFieldDescriptor extends 
FlatFieldDescriptor {
-
-               private String fieldName;
-
-               public NamedFlatFieldDescriptor(String name, int keyPosition, 
TypeInformation<?> type) {
-                       super(keyPosition, type);
-                       this.fieldName = name;
-               }
-
-               public String getFieldName() {
-                       return fieldName;
-               }
-
-               @Override
-               public String toString() {
-                       return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
deleted file mode 100644
index 22a18f1..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * This interface can be implemented by functions and input formats to tell 
the framework
- * about their produced data type. This method acts as an alternative to the 
reflection analysis
- * that is otherwise performed and is useful in situations where the produced 
data type may vary
- * depending on parameterization.
- */
-public interface ResultTypeQueryable<T> {
-
-       /**
-        * Gets the data type (as a {@link TypeInformation}) produced by this 
function or input format.
-        * 
-        * @return The data type produced by this function or input format.
-        */
-       TypeInformation<T> getProducedType();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
deleted file mode 100644
index 30710e5..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.*;
-import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer;
-//CHECKSTYLE.ON: AvoidStarImport
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.types.Value;
-
-/**
- * A {@link TypeInformation} for the tuple types of the Java API.
- *
- * @param <T> The type of the tuple.
- */
-public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> 
{
-       
-       private static final long serialVersionUID = 1L;
-
-       protected final String[] fieldNames;
-
-       @SuppressWarnings("unchecked")
-       public TupleTypeInfo(TypeInformation<?>... types) {
-               this((Class<T>) Tuple.getTupleClass(types.length), types);
-       }
-
-       public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
-               super(tupleType, types);
-
-               Preconditions.checkArgument(
-                       types.length <= Tuple.MAX_ARITY,
-                       "The tuple type exceeds the maximum supported arity.");
-
-               this.fieldNames = new String[types.length];
-
-               for (int i = 0; i < types.length; i++) {
-                       fieldNames[i] = "f" + i;
-               }
-       }
-
-       @Override
-       public String[] getFieldNames() {
-               return fieldNames;
-       }
-
-       @Override
-       public int getFieldIndex(String fieldName) {
-               int fieldIndex = Integer.parseInt(fieldName.substring(1));
-               if (fieldIndex >= getArity()) {
-                       return -1;
-               }
-               return fieldIndex;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TupleSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
-               if (getTypeClass() == Tuple0.class) {
-                       return (TupleSerializer<T>) Tuple0Serializer.INSTANCE;
-               }
-
-               TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[getArity()];
-               for (int i = 0; i < types.length; i++) {
-                       fieldSerializers[i] = 
types[i].createSerializer(executionConfig);
-               }
-               
-               Class<T> tupleClass = getTypeClass();
-               
-               return new TupleSerializer<T>(tupleClass, fieldSerializers);
-       }
-
-       @Override
-       protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
-               return new TupleTypeComparatorBuilder();
-       }
-
-       private class TupleTypeComparatorBuilder implements 
TypeComparatorBuilder<T> {
-
-               private final ArrayList<TypeComparator> fieldComparators = new 
ArrayList<TypeComparator>();
-               private final ArrayList<Integer> logicalKeyFields = new 
ArrayList<Integer>();
-
-               @Override
-               public void initializeTypeComparatorBuilder(int size) {
-                       fieldComparators.ensureCapacity(size);
-                       logicalKeyFields.ensureCapacity(size);
-               }
-
-               @Override
-               public void addComparatorField(int fieldId, TypeComparator<?> 
comparator) {
-                       fieldComparators.add(comparator);
-                       logicalKeyFields.add(fieldId);
-               }
-
-               @Override
-               public TypeComparator<T> createTypeComparator(ExecutionConfig 
config) {
-                       Preconditions.checkState(
-                               fieldComparators.size() > 0,
-                               "No field comparators were defined for the 
TupleTypeComparatorBuilder."
-                       );
-
-                       Preconditions.checkState(
-                               logicalKeyFields.size() > 0,
-                               "No key fields were defined for the 
TupleTypeComparatorBuilder."
-                       );
-
-                       Preconditions.checkState(
-                               fieldComparators.size() == 
logicalKeyFields.size(),
-                               "The number of field comparators and key fields 
is not equal."
-                       );
-
-                       final int maxKey = Collections.max(logicalKeyFields);
-
-                       Preconditions.checkState(
-                               maxKey >= 0,
-                               "The maximum key field must be greater or equal 
than 0."
-                       );
-
-                       TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[maxKey + 1];
-
-                       for (int i = 0; i <= maxKey; i++) {
-                               fieldSerializers[i] = 
types[i].createSerializer(config);
-                       }
-
-                       return new TupleComparator<T>(
-                               Ints.toArray(logicalKeyFields),
-                               fieldComparators.toArray(new 
TypeComparator[fieldComparators.size()]),
-                               fieldSerializers
-                       );
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof TupleTypeInfo) {
-                       @SuppressWarnings("unchecked")
-                       TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj;
-                       return other.canEqual(this) &&
-                               super.equals(other) &&
-                               Arrays.equals(fieldNames, other.fieldNames);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof TupleTypeInfo;
-       }
-       
-       @Override
-       public int hashCode() {
-               return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
-       }
-       
-       @Override
-       public String toString() {
-               return "Java " + super.toString();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public static <X extends Tuple> TupleTypeInfo<X> 
getBasicTupleTypeInfo(Class<?>... basicTypes) {
-               if (basicTypes == null || basicTypes.length == 0) {
-                       throw new IllegalArgumentException();
-               }
-               
-               TypeInformation<?>[] infos = new 
TypeInformation<?>[basicTypes.length];
-               for (int i = 0; i < infos.length; i++) {
-                       Class<?> type = basicTypes[i];
-                       if (type == null) {
-                               throw new IllegalArgumentException("Type at 
position " + i + " is null.");
-                       }
-                       
-                       TypeInformation<?> info = 
BasicTypeInfo.getInfoFor(type);
-                       if (info == null) {
-                               throw new IllegalArgumentException("Type at 
position " + i + " is not a basic type.");
-                       }
-                       infos[i] = info;
-               }
-
-               @SuppressWarnings("unchecked")
-               TupleTypeInfo<X> tupleInfo = (TupleTypeInfo<X>) new 
TupleTypeInfo<Tuple>(infos);
-               return tupleInfo;
-       }
-
-       @SuppressWarnings("unchecked")
-       public static <X extends Tuple> TupleTypeInfo<X> 
getBasicAndBasicValueTupleTypeInfo(Class<?>... basicTypes) {
-               if (basicTypes == null || basicTypes.length == 0) {
-                       throw new IllegalArgumentException();
-               }
-
-               TypeInformation<?>[] infos = new 
TypeInformation<?>[basicTypes.length];
-               for (int i = 0; i < infos.length; i++) {
-                       Class<?> type = basicTypes[i];
-                       if (type == null) {
-                               throw new IllegalArgumentException("Type at 
position " + i + " is null.");
-                       }
-
-                       TypeInformation<?> info = 
BasicTypeInfo.getInfoFor(type);
-                       if (info == null) {
-                               try {
-                                       info = 
ValueTypeInfo.getValueTypeInfo((Class<Value>) type);
-                                       if (!((ValueTypeInfo<?>) 
info).isBasicValueType()) {
-                                               throw new 
IllegalArgumentException("Type at position " + i + " is not a basic or value 
type.");
-                                       }
-                               } catch (ClassCastException | 
InvalidTypesException e) {
-                                       throw new 
IllegalArgumentException("Type at position " + i + " is not a basic or value 
type.", e);
-                               }
-                       }
-                       infos[i] = info;
-               }
-
-
-               return (TupleTypeInfo<X>) new TupleTypeInfo<>(infos);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
deleted file mode 100644
index 469476f..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-
-public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
-
-       private static final long serialVersionUID = 1L;
-       
-       private final static String REGEX_FIELD = "(f?)([0-9]+)";
-       private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
-       private final static String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS
-                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR
-                       +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
-
-       private static final Pattern PATTERN_FIELD = 
Pattern.compile(REGEX_FIELD);
-       private static final Pattern PATTERN_NESTED_FIELDS = 
Pattern.compile(REGEX_NESTED_FIELDS);
-       private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       protected final TypeInformation<?>[] types;
-       
-       private final int totalFields;
-
-       public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... 
types) {
-               super(tupleType);
-
-               this.types = Preconditions.checkNotNull(types);
-
-               int fieldCounter = 0;
-
-               for(TypeInformation<?> type : types) {
-                       fieldCounter += type.getTotalFields();
-               }
-
-               totalFields = fieldCounter;
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return true;
-       }
-
-       public boolean isCaseClass() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return types.length;
-       }
-       
-       @Override
-       public int getTotalFields() {
-               return totalFields;
-       }
-
-       @Override
-       public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
-
-               Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
-               if (!matcher.matches()) {
-                       throw new InvalidFieldReferenceException("Invalid tuple 
field reference \""+fieldExpression+"\".");
-               }
-
-               String field = matcher.group(0);
-               if (field.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
-                       // handle select all
-                       int keyPosition = 0;
-                       for (TypeInformation<?> type : types) {
-                               if (type instanceof CompositeType) {
-                                       CompositeType<?> cType = 
(CompositeType<?>) type;
-                                       
cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + 
keyPosition, result);
-                                       keyPosition += cType.getTotalFields() - 
1;
-                               } else {
-                                       result.add(new 
FlatFieldDescriptor(offset + keyPosition, type));
-                               }
-                               keyPosition++;
-                       }
-               } else {
-                       String fieldStr = matcher.group(1);
-                       Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
-
-                       if (!fieldMatcher.matches()) {
-                               throw new RuntimeException("Invalid matcher 
pattern");
-                       }
-
-                       field = fieldMatcher.group(2);
-                       int fieldPos = Integer.valueOf(field);
-
-                       if (fieldPos >= this.getArity()) {
-                               throw new InvalidFieldReferenceException("Tuple 
field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + 
".");
-                       }
-                       TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
-                       String tail = matcher.group(5);
-                       if (tail == null) {
-                               if (fieldType instanceof CompositeType) {
-                                       // forward offsets
-                                       for (int i = 0; i < fieldPos; i++) {
-                                               offset += 
this.getTypeAt(i).getTotalFields();
-                                       }
-                                       // add all fields of composite type
-                                       ((CompositeType<?>) 
fieldType).getFlatFields("*", offset, result);
-                               } else {
-                                       // we found the field to add
-                                       // compute flat field position by 
adding skipped fields
-                                       int flatFieldPos = offset;
-                                       for (int i = 0; i < fieldPos; i++) {
-                                               flatFieldPos += 
this.getTypeAt(i).getTotalFields();
-                                       }
-                                       result.add(new 
FlatFieldDescriptor(flatFieldPos, fieldType));
-                               }
-                       } else {
-                               if (fieldType instanceof CompositeType<?>) {
-                                       // forward offset
-                                       for (int i = 0; i < fieldPos; i++) {
-                                               offset += 
this.getTypeAt(i).getTotalFields();
-                                       }
-                                       ((CompositeType<?>) 
fieldType).getFlatFields(tail, offset, result);
-                               } else {
-                                       throw new 
InvalidFieldReferenceException("Nested field expression \"" + tail + "\" not 
possible on atomic type " + fieldType + ".");
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
-
-               Matcher matcher = 
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
-               if(!matcher.matches()) {
-                       if 
(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || 
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
-                               throw new 
InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
-                       } else {
-                               throw new 
InvalidFieldReferenceException("Invalid format of tuple field expression 
\""+fieldExpression+"\".");
-                       }
-               }
-
-               String fieldStr = matcher.group(1);
-               Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
-               if(!fieldMatcher.matches()) {
-                       throw new RuntimeException("Invalid matcher pattern");
-               }
-               String field = fieldMatcher.group(2);
-               int fieldPos = Integer.valueOf(field);
-
-               if(fieldPos >= this.getArity()) {
-                       throw new InvalidFieldReferenceException("Tuple field 
expression \""+fieldStr+"\" out of bounds of "+this.toString()+".");
-               }
-               TypeInformation<X> fieldType = this.getTypeAt(fieldPos);
-               String tail = matcher.group(5);
-               if(tail == null) {
-                       // we found the type
-                       return fieldType;
-               } else {
-                       if(fieldType instanceof CompositeType<?>) {
-                               return ((CompositeType<?>) 
fieldType).getTypeAt(tail);
-                       } else {
-                               throw new 
InvalidFieldReferenceException("Nested field expression \""+tail+"\" not 
possible on atomic type "+fieldType+".");
-                       }
-               }
-       }
-       
-       @Override
-       public <X> TypeInformation<X> getTypeAt(int pos) {
-               if (pos < 0 || pos >= this.types.length) {
-                       throw new IndexOutOfBoundsException();
-               }
-
-               @SuppressWarnings("unchecked")
-               TypeInformation<X> typed = (TypeInformation<X>) this.types[pos];
-               return typed;
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof TupleTypeInfoBase) {
-                       @SuppressWarnings("unchecked")
-                       TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj;
-
-                       return other.canEqual(this) &&
-                               super.equals(other) &&
-                               Arrays.equals(types, other.types) &&
-                               totalFields == other.totalFields;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof TupleTypeInfoBase;
-       }
-       
-       @Override
-       public int hashCode() {
-               return 31 * (31 * super.hashCode() + Arrays.hashCode(types)) + 
totalFields;
-       }
-
-       @Override
-       public String toString() {
-               StringBuilder bld = new StringBuilder("Tuple");
-               bld.append(types.length);
-               if (types.length > 0) {
-                       bld.append('<').append(types[0]);
-
-                       for (int i = 1; i < types.length; i++) {
-                               bld.append(", ").append(types[i]);
-                       }
-
-                       bld.append('>');
-               }
-               return bld.toString();
-       }
-
-       @Override
-       public boolean hasDeterministicFieldOrder() {
-               return true;
-       }
-}

Reply via email to