Repository: flink
Updated Branches:
  refs/heads/master 06503c8fd -> d8dbaeeb4


http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
index d088d16..e9d5dac 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -19,6 +19,8 @@
                                            
 package org.apache.flink.api.java.typeutils;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -36,6 +38,7 @@ public class TypeInfoParser {
 
        private static final Pattern tuplePattern = Pattern.compile("^((" + 
TUPLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Tuple[0-9]+)<");
        private static final Pattern writablePattern = Pattern.compile("^((" + 
WRITABLE_PACKAGE.replaceAll("\\.", "\\\\.") + 
"\\.)?Writable)<([^\\s,>]*)(,|>|$)");
+       private static final Pattern enumPattern = 
Pattern.compile("^((java\\.lang\\.)?Enum)<([^\\s,>]*)(,|>|$)");
        private static final Pattern basicTypePattern = Pattern
                        
.compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean))(,|>|$)");
        private static final Pattern basicType2Pattern = 
Pattern.compile("^(int|byte|short|char|double|float|long|boolean)(,|>|$)");
@@ -44,7 +47,8 @@ public class TypeInfoParser {
        private static final Pattern basicArrayTypePattern = Pattern
                        
.compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean))\\[\\](,|>|$)");
        private static final Pattern basicArrayType2Pattern = 
Pattern.compile("^(int|byte|short|char|double|float|long|boolean)\\[\\](,|>|$)");
-       private static final Pattern customObjectPattern = 
Pattern.compile("^([^\\s,>]+)(,|>|$)");
+       private static final Pattern pojoGenericObjectPattern = 
Pattern.compile("^([^\\s,<>]+)(<)?");
+       private static final Pattern fieldPattern = 
Pattern.compile("^([^\\s,<>]+)=");
 
        /**
         * Generates an instance of <code>TypeInformation</code> by parsing a 
type
@@ -57,18 +61,19 @@ public class TypeInfoParser {
         * <code>String[]</code>, etc.
         * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
         * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
-        * <li>Custom types such as <code>org.my.CustomClass</code>,
-        * <code>org.my.CustomClass$StaticInnerClass</code>, etc.
+        * <li>Pojo types such as 
<code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, 
etc.</li>
+        * <li>Generic types such as <code>java.lang.Class</code>, etc.
         * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
         * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
         * <li>Value types such as <code>DoubleValue</code>,
         * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
-        * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], 
etc.</code></li>
+        * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], 
etc.</code></li>
         * <li>Writable types such as 
<code>Writable&lt;org.my.CustomWritable&gt;</code></li>
+        * <li>Enum types such as 
<code>Enum&lt;org.my.CustomEnum&gt;</code></li>
         * </ul>
         *
         * Example:
-        * 
<code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyClass&gt;&gt;"</code>
+        * 
<code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
         *
         * @param infoString
         *            type information string to be parsed
@@ -97,6 +102,7 @@ public class TypeInfoParser {
                final Matcher tupleMatcher = tuplePattern.matcher(infoString);
 
                final Matcher writableMatcher = 
writablePattern.matcher(infoString);
+               final Matcher enumMatcher = enumPattern.matcher(infoString);
 
                final Matcher basicTypeMatcher = 
basicTypePattern.matcher(infoString);
                final Matcher basicType2Matcher = 
basicType2Pattern.matcher(infoString);
@@ -106,7 +112,7 @@ public class TypeInfoParser {
                final Matcher basicArrayTypeMatcher = 
basicArrayTypePattern.matcher(infoString);
                final Matcher basicArrayType2Matcher = 
basicArrayType2Pattern.matcher(infoString);
 
-               final Matcher customObjectMatcher = 
customObjectPattern.matcher(infoString);
+               final Matcher pojoGenericMatcher = 
pojoGenericObjectPattern.matcher(infoString);
 
                if (infoString.length() == 0) {
                        return null;
@@ -163,15 +169,17 @@ public class TypeInfoParser {
                else if (writableMatcher.find()) {
                        String className = writableMatcher.group(1);
                        String fullyQualifiedName = writableMatcher.group(3);
-                       sb.delete(0, className.length() + 1 + 
fullyQualifiedName.length());
-
-                       try {
-                               Class<?> clazz = 
Class.forName(fullyQualifiedName);
-                               returnType = 
WritableTypeInfo.getWritableTypeInfo((Class) clazz);
-                       } catch (ClassNotFoundException e) {
-                               throw new IllegalArgumentException("Class '" + 
fullyQualifiedName
-                                               + "' could not be found for use 
as writable type. Please note that inner classes must be declared static.");
-                       }
+                       sb.delete(0, className.length() + 1 + 
fullyQualifiedName.length() + 1);
+                       Class<?> clazz = loadClass(fullyQualifiedName);
+                       returnType = 
WritableTypeInfo.getWritableTypeInfo((Class) clazz);
+               }
+               // enum types
+               else if (enumMatcher.find()) {
+                       String className = enumMatcher.group(1);
+                       String fullyQualifiedName = enumMatcher.group(3);
+                       sb.delete(0, className.length() + 1 + 
fullyQualifiedName.length() + 1);
+                       Class<?> clazz = loadClass(fullyQualifiedName);
+                       returnType = new EnumTypeInfo(clazz);
                }
                // basic types of classes
                else if (basicTypeMatcher.find()) {
@@ -225,7 +233,7 @@ public class TypeInfoParser {
                        }
                        returnType = 
ValueTypeInfo.getValueTypeInfo((Class<Value>) clazz);
                }
-               // array of classes
+               // array of basic classes
                else if (basicArrayTypeMatcher.find()) {
                        String className = basicArrayTypeMatcher.group(1);
                        sb.delete(0, className.length() + 2);
@@ -263,32 +271,43 @@ public class TypeInfoParser {
                        }
                        returnType = PrimitiveArrayTypeInfo.getInfoFor(clazz);
                }
-               // custom objects
-               else if (customObjectMatcher.find()) {
-                       String fullyQualifiedName = 
customObjectMatcher.group(1);
+               // pojo objects or generic types
+               else if (pojoGenericMatcher.find()) {
+                       String fullyQualifiedName = pojoGenericMatcher.group(1);
                        sb.delete(0, fullyQualifiedName.length());
 
-                       if (fullyQualifiedName.contains("<")) {
-                               throw new 
IllegalArgumentException("Parameterized custom classes are not supported by 
parser.");
-                       }
+                       boolean isPojo = pojoGenericMatcher.group(2) != null;
+
+                       if (isPojo) {
+                               sb.deleteCharAt(0);
+                               Class<?> clazz = loadClass(fullyQualifiedName);
+
+                               ArrayList<PojoField> fields = new 
ArrayList<PojoField>();
+                               while (sb.charAt(0) != '>') {
+                                       final Matcher fieldMatcher = 
fieldPattern.matcher(sb);
+                                       if (!fieldMatcher.find()) {
+                                               throw new 
IllegalArgumentException("Field name missing.");
+                                       }
+                                       String fieldName = 
fieldMatcher.group(1);
+                                       sb.delete(0, fieldName.length() + 1);
 
-                       // custom object array
-                       if (fullyQualifiedName.endsWith("[]")) {
-                               fullyQualifiedName = 
fullyQualifiedName.substring(0, fullyQualifiedName.length() - 2);
-                               try {
-                                       Class<?> clazz = Class.forName("[L" + 
fullyQualifiedName + ";");
-                                       returnType = 
ObjectArrayTypeInfo.getInfoFor(clazz);
-                               } catch (ClassNotFoundException e) {
-                                       throw new 
IllegalArgumentException("Class '" + fullyQualifiedName
-                                                       + "' could not be found 
for use as object array. Please note that inner classes must be declared 
static.");
+                                       Field field = null;
+                                       try {
+                                               field = 
clazz.getDeclaredField(fieldName);
+                                       } catch (Exception e) {
+                                               throw new 
IllegalArgumentException("Field '" + fieldName + "'could not be accessed.");
+                                       }
+                                       fields.add(new PojoField(field, 
parse(sb)));
                                }
-                       } else {
-                               try {
-                                       Class<?> clazz = 
Class.forName(fullyQualifiedName);
-                                       returnType = new GenericTypeInfo(clazz);
-                               } catch (ClassNotFoundException e) {
-                                       throw new 
IllegalArgumentException("Class '" + fullyQualifiedName
-                                                       + "' could not be found 
for use as custom object. Please note that inner classes must be declared 
static.");
+                               returnType = new PojoTypeInfo(clazz, fields);
+                       }
+                       else {
+                               // custom object array
+                               if (fullyQualifiedName.endsWith("[]")) {
+                                       fullyQualifiedName = 
fullyQualifiedName.substring(0, fullyQualifiedName.length() - 2);
+                                       returnType = 
ObjectArrayTypeInfo.getInfoFor(loadClass("[L" + fullyQualifiedName + ";"));
+                               } else {
+                                       returnType = new 
GenericTypeInfo(loadClass(fullyQualifiedName));
                                }
                        }
                }
@@ -304,4 +323,13 @@ public class TypeInfoParser {
                }
        }
 
+       private static Class<?> loadClass(String fullyQualifiedName) {
+               try {
+                       return Class.forName(fullyQualifiedName);
+               } catch (ClassNotFoundException e) {
+                       throw new IllegalArgumentException("Class '" + 
fullyQualifiedName
+                                       + "' could not be found. Please note 
that inner classes must be declared static.");
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
new file mode 100644
index 0000000..6eb536d
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DistinctOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class DistinctTranslationTest {
+
+       @Test
+       public void testCombinable() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<String> input = env.fromElements("1", "2", "1", 
"3");
+                       
+                       
+                       DistinctOperator<String> op = input.distinct(new 
KeySelector<String, String>() {
+                               public String getKey(String value) { return 
value; }
+                       });
+                       
+                       op.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       
+                       GroupReduceOperatorBase<?, ?, ?> reduceOp = 
(GroupReduceOperatorBase<?, ?, ?>) 
p.getDataSinks().iterator().next().getInput();
+                       assertTrue(reduceOp.isCombinable());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void translateDistinctPlain() {
+               try {
+                       final int DOP = 8;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
initialData = getSourceDataSet(env);
+
+                       initialData.distinct().print();
+
+                       Plan p = env.createProgramPlan();
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+
+                       // currently distinct is translated to a GroupReduce
+                       GroupReduceOperatorBase<?, ?, ?> reducer = 
(GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+                       // check types
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getInputType());
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
+
+                       // check keys
+                       assertArrayEquals(new int[] {0, 1, 2}, 
reducer.getKeyColumns(0));
+
+                       // DOP was not configured on the operator
+                       assertTrue(reducer.getDegreeOfParallelism() == 1 || 
reducer.getDegreeOfParallelism() == -1);
+
+                       assertTrue(reducer.getInput() instanceof 
GenericDataSourceBase<?, ?>);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @Test
+       public void translateDistinctPlain2() {
+               try {
+                       final int DOP = 8;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
+
+                       DataSet<CustomType> initialData = 
getSourcePojoDataSet(env);
+
+                       initialData.distinct().print();
+
+                       Plan p = env.createProgramPlan();
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+
+                       // currently distinct is translated to a GroupReduce
+                       GroupReduceOperatorBase<?, ?, ?> reducer = 
(GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+                       // check types
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getInputType());
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
+
+                       // check keys
+                       assertArrayEquals(new int[] {0}, 
reducer.getKeyColumns(0));
+
+                       // DOP was not configured on the operator
+                       assertTrue(reducer.getDegreeOfParallelism() == 1 || 
reducer.getDegreeOfParallelism() == -1);
+
+                       assertTrue(reducer.getInput() instanceof 
GenericDataSourceBase<?, ?>);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @Test
+       public void translateDistinctPosition() {
+               try {
+                       final int DOP = 8;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
initialData = getSourceDataSet(env);
+
+                       initialData.distinct(1, 2).print();
+
+                       Plan p = env.createProgramPlan();
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+
+                       // currently distinct is translated to a GroupReduce
+                       GroupReduceOperatorBase<?, ?, ?> reducer = 
(GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+                       // check types
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getInputType());
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
+
+                       // check keys
+                       assertArrayEquals(new int[] {1, 2}, 
reducer.getKeyColumns(0));
+
+                       // DOP was not configured on the operator
+                       assertTrue(reducer.getDegreeOfParallelism() == 1 || 
reducer.getDegreeOfParallelism() == -1);
+
+                       assertTrue(reducer.getInput() instanceof 
GenericDataSourceBase<?, ?>);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @Test
+       public void translateDistinctKeySelector() {
+               try {
+                       final int DOP = 8;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
initialData = getSourceDataSet(env);
+
+                       initialData.distinct(new 
KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+                               public StringValue getKey(Tuple3<Double, 
StringValue, LongValue> value) {
+                                       return value.f1;
+                               }
+                       }).setParallelism(4).print();
+
+                       Plan p = env.createProgramPlan();
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+
+                       PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = 
(PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput();
+                       MapOperatorBase<?, ?, ?> keyExtractor = 
(MapOperatorBase<?, ?, ?>) reducer.getInput();
+
+                       // check the DOPs
+                       assertEquals(1, keyExtractor.getDegreeOfParallelism());
+                       assertEquals(4, reducer.getDegreeOfParallelism());
+
+                       // check types
+                       TypeInformation<?> keyValueInfo = new 
TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+                                       new 
ValueTypeInfo<StringValue>(StringValue.class),
+                                       initialData.getType());
+
+                       assertEquals(initialData.getType(), 
keyExtractor.getOperatorInfo().getInputType());
+                       assertEquals(keyValueInfo, 
keyExtractor.getOperatorInfo().getOutputType());
+
+                       assertEquals(keyValueInfo, 
reducer.getOperatorInfo().getInputType());
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
+
+                       // check keys
+                       assertEquals(KeyExtractingMapper.class, 
keyExtractor.getUserCodeWrapper().getUserCodeClass());
+
+                       assertTrue(keyExtractor.getInput() instanceof 
GenericDataSourceBase<?, ?>);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @Test
+       public void translateDistinctExpressionKey() {
+               try {
+                       final int DOP = 8;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
+
+                       DataSet<CustomType> initialData = 
getSourcePojoDataSet(env);
+
+                       initialData.distinct("myInt").print();
+
+                       Plan p = env.createProgramPlan();
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+
+                       // currently distinct is translated to a GroupReduce
+                       GroupReduceOperatorBase<?, ?, ?> reducer = 
(GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+                       // check types
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getInputType());
+                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
+
+                       // check keys
+                       assertArrayEquals(new int[] {0}, 
reducer.getKeyColumns(0));
+
+                       // DOP was not configured on the operator
+                       assertTrue(reducer.getDegreeOfParallelism() == 1 || 
reducer.getDegreeOfParallelism() == -1);
+
+                       assertTrue(reducer.getInput() instanceof 
GenericDataSourceBase<?, ?>);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static final DataSet<Tuple3<Double, StringValue, LongValue>> 
getSourceDataSet(ExecutionEnvironment env) {
+               return env.fromElements(new Tuple3<Double, StringValue, 
LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
+                               .setParallelism(1);
+       }
+
+       private static DataSet<CustomType> 
getSourcePojoDataSet(ExecutionEnvironment env) {
+               List<CustomType> data = new ArrayList<CustomType>();
+               data.add(new CustomType(1));
+               return env.fromCollection(data);
+       }
+
+       public static class CustomType implements Serializable {
+
+               private static final long serialVersionUID = 1L;
+               public int myInt;
+               @SuppressWarnings("unused")
+               public CustomType() {};
+
+               public CustomType(int i) {
+                       myInt = i;
+               }
+
+               @Override
+               public String toString() {
+                       return ""+myInt;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
deleted file mode 100644
index d06731e..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators.translation;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.DistinctOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class DistrinctTranslationTest {
-
-       @Test
-       public void testCombinable() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<String> input = env.fromElements("1", "2", "1", 
"3");
-                       
-                       
-                       DistinctOperator<String> op = input.distinct(new 
KeySelector<String, String>() {
-                               public String getKey(String value) { return 
value; }
-                       });
-                       
-                       op.print();
-                       
-                       Plan p = env.createProgramPlan();
-                       
-                       GroupReduceOperatorBase<?, ?, ?> reduceOp = 
(GroupReduceOperatorBase<?, ?, ?>) 
p.getDataSinks().iterator().next().getInput();
-                       assertTrue(reduceOp.isCombinable());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void translateDistinctPlain() {
-               try {
-                       final int DOP = 8;
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
-
-                       DataSet<Tuple3<Double, StringValue, LongValue>> 
initialData = getSourceDataSet(env);
-
-                       initialData.distinct().print();
-
-                       Plan p = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
-
-                       // currently distinct is translated to a GroupReduce
-                       GroupReduceOperatorBase<?, ?, ?> reducer = 
(GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
-
-                       // check types
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getInputType());
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
-
-                       // check keys
-                       assertArrayEquals(new int[] {0, 1, 2}, 
reducer.getKeyColumns(0));
-
-                       // DOP was not configured on the operator
-                       assertTrue(reducer.getDegreeOfParallelism() == 1 || 
reducer.getDegreeOfParallelism() == -1);
-
-                       assertTrue(reducer.getInput() instanceof 
GenericDataSourceBase<?, ?>);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test caused an error: " + e.getMessage());
-               }
-       }
-
-       @Test
-       public void translateDistinctPlain2() {
-               try {
-                       final int DOP = 8;
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
-
-                       DataSet<CustomType> initialData = 
getSourcePojoDataSet(env);
-
-                       initialData.distinct().print();
-
-                       Plan p = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
-
-                       // currently distinct is translated to a GroupReduce
-                       GroupReduceOperatorBase<?, ?, ?> reducer = 
(GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
-
-                       // check types
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getInputType());
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
-
-                       // check keys
-                       assertArrayEquals(new int[] {0}, 
reducer.getKeyColumns(0));
-
-                       // DOP was not configured on the operator
-                       assertTrue(reducer.getDegreeOfParallelism() == 1 || 
reducer.getDegreeOfParallelism() == -1);
-
-                       assertTrue(reducer.getInput() instanceof 
GenericDataSourceBase<?, ?>);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test caused an error: " + e.getMessage());
-               }
-       }
-
-       @Test
-       public void translateDistinctPosition() {
-               try {
-                       final int DOP = 8;
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
-
-                       DataSet<Tuple3<Double, StringValue, LongValue>> 
initialData = getSourceDataSet(env);
-
-                       initialData.distinct(1, 2).print();
-
-                       Plan p = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
-
-                       // currently distinct is translated to a GroupReduce
-                       GroupReduceOperatorBase<?, ?, ?> reducer = 
(GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
-
-                       // check types
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getInputType());
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
-
-                       // check keys
-                       assertArrayEquals(new int[] {1, 2}, 
reducer.getKeyColumns(0));
-
-                       // DOP was not configured on the operator
-                       assertTrue(reducer.getDegreeOfParallelism() == 1 || 
reducer.getDegreeOfParallelism() == -1);
-
-                       assertTrue(reducer.getInput() instanceof 
GenericDataSourceBase<?, ?>);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test caused an error: " + e.getMessage());
-               }
-       }
-
-       @Test
-       public void translateDistinctKeySelector() {
-               try {
-                       final int DOP = 8;
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
-
-                       DataSet<Tuple3<Double, StringValue, LongValue>> 
initialData = getSourceDataSet(env);
-
-                       initialData.distinct(new 
KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
-                               public StringValue getKey(Tuple3<Double, 
StringValue, LongValue> value) {
-                                       return value.f1;
-                               }
-                       }).setParallelism(4).print();
-
-                       Plan p = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
-
-                       PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = 
(PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput();
-                       MapOperatorBase<?, ?, ?> keyExtractor = 
(MapOperatorBase<?, ?, ?>) reducer.getInput();
-
-                       // check the DOPs
-                       assertEquals(1, keyExtractor.getDegreeOfParallelism());
-                       assertEquals(4, reducer.getDegreeOfParallelism());
-
-                       // check types
-                       TypeInformation<?> keyValueInfo = new 
TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
-                                       new 
ValueTypeInfo<StringValue>(StringValue.class),
-                                       initialData.getType());
-
-                       assertEquals(initialData.getType(), 
keyExtractor.getOperatorInfo().getInputType());
-                       assertEquals(keyValueInfo, 
keyExtractor.getOperatorInfo().getOutputType());
-
-                       assertEquals(keyValueInfo, 
reducer.getOperatorInfo().getInputType());
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
-
-                       // check keys
-                       assertEquals(KeyExtractingMapper.class, 
keyExtractor.getUserCodeWrapper().getUserCodeClass());
-
-                       assertTrue(keyExtractor.getInput() instanceof 
GenericDataSourceBase<?, ?>);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test caused an error: " + e.getMessage());
-               }
-       }
-
-       @Test
-       public void translateDistinctExpressionKey() {
-               try {
-                       final int DOP = 8;
-                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(DOP);
-
-                       DataSet<CustomType> initialData = 
getSourcePojoDataSet(env);
-
-                       initialData.distinct("myInt").print();
-
-                       Plan p = env.createProgramPlan();
-
-                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
-
-                       // currently distinct is translated to a GroupReduce
-                       GroupReduceOperatorBase<?, ?, ?> reducer = 
(GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
-
-                       // check types
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getInputType());
-                       assertEquals(initialData.getType(), 
reducer.getOperatorInfo().getOutputType());
-
-                       // check keys
-                       assertArrayEquals(new int[] {0}, 
reducer.getKeyColumns(0));
-
-                       // DOP was not configured on the operator
-                       assertTrue(reducer.getDegreeOfParallelism() == 1 || 
reducer.getDegreeOfParallelism() == -1);
-
-                       assertTrue(reducer.getInput() instanceof 
GenericDataSourceBase<?, ?>);
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test caused an error: " + e.getMessage());
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       private static final DataSet<Tuple3<Double, StringValue, LongValue>> 
getSourceDataSet(ExecutionEnvironment env) {
-               return env.fromElements(new Tuple3<Double, StringValue, 
LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
-                               .setParallelism(1);
-       }
-
-       private static DataSet<CustomType> 
getSourcePojoDataSet(ExecutionEnvironment env) {
-               List<CustomType> data = new ArrayList<CustomType>();
-               data.add(new CustomType(1));
-               return env.fromCollection(data);
-       }
-
-       public static class CustomType implements Serializable {
-
-               private static final long serialVersionUID = 1L;
-               public int myInt;
-               @SuppressWarnings("unused")
-               public CustomType() {};
-
-               public CustomType(int i) {
-                       myInt = i;
-               }
-
-               @Override
-               public String toString() {
-                       return ""+myInt;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
index 23a4d5f..9834a25 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "serial", "deprecation" })
 public class CoGroupWrappingFunctionTest {
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
index 7b28c3f..9e262fc 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({ "serial", "deprecation" })
 public class ReduceWrappingFunctionTest {
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 96ba16b..39d6e10 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -53,6 +52,7 @@ import com.google.common.collect.HashMultiset;
 public class PojoTypeExtractionTest {
 
        public static class HasDuplicateField extends WC {
+               @SuppressWarnings("unused")
                private int count; // duplicate
        }
 
@@ -614,6 +614,6 @@ public class PojoTypeExtractionTest {
        @Test
        public void testGetterSetterWithVertex() {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<VertexTyped> set = env.fromElements(new VertexTyped(0L, 
3.0), new VertexTyped(1L, 1.0));
+               env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 
1.0));
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index f8fb13c..1364a2f 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple9;
 import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -626,7 +627,7 @@ public class TypeExtractorTest {
 
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
-       public void testMissingTupleGenericsException() {
+       public void testMissingTupleGenerics() {
                RichMapFunction<?, ?> function = new RichMapFunction<String, 
Tuple2>() {
                        private static final long serialVersionUID = 1L;
 
@@ -636,11 +637,15 @@ public class TypeExtractorTest {
                        }
                };
 
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeInfoParser.parse("String"), "name", true);
+               Assert.assertTrue(ti instanceof MissingTypeInfo);
+               
                try {
                        TypeExtractor.getMapReturnTypes(function, 
(TypeInformation) TypeInfoParser.parse("String"));
-                       Assert.fail("exception expected");
-               } catch (InvalidTypesException e) {
-                       // right
+                       Assert.fail("Expected an exception");
+               }
+               catch (InvalidTypesException e) {
+                       // expected
                }
        }
 
@@ -656,11 +661,15 @@ public class TypeExtractorTest {
                        }
                };
 
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeInfoParser.parse("String"), "name", true);
+               Assert.assertTrue(ti instanceof MissingTypeInfo);
+               
                try {
                        TypeExtractor.getMapReturnTypes(function, 
(TypeInformation) TypeInfoParser.parse("String"));
-                       Assert.fail("exception expected");
-               } catch (InvalidTypesException e) {
-                       // right
+                       Assert.fail("Expected an exception");
+               }
+               catch (InvalidTypesException e) {
+                       // expected
                }
        }
 
@@ -795,11 +804,15 @@ public class TypeExtractorTest {
                        }
                };
 
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String"), 
"name", true);
+               Assert.assertTrue(ti instanceof MissingTypeInfo);
+               
                try {
                        TypeExtractor.getMapReturnTypes(function, 
TypeInfoParser.parse("String"));
-                       Assert.fail("exception expected");
-               } catch (InvalidTypesException e) {
-                       // right
+                       Assert.fail("Expected an exception");
+               }
+               catch (InvalidTypesException e) {
+                       // expected
                }
        }
 
@@ -893,14 +906,18 @@ public class TypeExtractorTest {
        }
 
        @Test
-       public void testFunctionDependingOnInputException() {
+       public void testFunctionDependingOnUnknownInput() {
                IdentityMapper3<Boolean, String> function = new 
IdentityMapper3<Boolean, String>();
 
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.BOOLEAN_TYPE_INFO, 
"name", true);
+               Assert.assertTrue(ti instanceof MissingTypeInfo);
+               
                try {
                        TypeExtractor.getMapReturnTypes(function, 
BasicTypeInfo.BOOLEAN_TYPE_INFO);
-                       Assert.fail("exception expected");
-               } catch (InvalidTypesException e) {
-                       // right
+                       Assert.fail("Expected an exception");
+               }
+               catch (InvalidTypesException e) {
+                       // expected
                }
        }
 
@@ -1072,12 +1089,8 @@ public class TypeExtractorTest {
                        }
                };
                
-               try {
-                       TypeExtractor.getMapReturnTypes(function, 
BasicTypeInfo.STRING_TYPE_INFO);
-                       Assert.fail("exception expected");
-               } catch (InvalidTypesException e) {
-                       // good
-               }
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO, null, 
true);
+               Assert.assertTrue(ti instanceof MissingTypeInfo);
 
                RichMapFunction<String, ?> function2 = new 
RichMapFunction<String, AbstractClass>() {
                        private static final long serialVersionUID = 1L;
@@ -1088,12 +1101,8 @@ public class TypeExtractorTest {
                        }
                };
 
-               try {
-                       TypeExtractor.getMapReturnTypes(function2, 
BasicTypeInfo.STRING_TYPE_INFO);
-                       Assert.fail("exception expected");
-               } catch (InvalidTypesException e) {
-                       // slick!
-               }
+               TypeInformation<?> ti2 = 
TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO, 
null, true);
+               Assert.assertTrue(ti2 instanceof MissingTypeInfo);
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -1108,11 +1117,15 @@ public class TypeExtractorTest {
                        }
                };
 
+               TypeInformation<?> ti 
=TypeExtractor.getMapReturnTypes(function, 
(TypeInformation)TypeInfoParser.parse("StringValue"), "name", true);
+               Assert.assertTrue(ti instanceof MissingTypeInfo);
+               
                try {
                        TypeExtractor.getMapReturnTypes(function, 
(TypeInformation)TypeInfoParser.parse("StringValue"));
-                       Assert.fail("exception expected");
-               } catch (InvalidTypesException e) {
-                       // bam! go type extractor!
+                       Assert.fail("Expected an exception");
+               }
+               catch (InvalidTypesException e) {
+                       // expected
                }
        }
 
@@ -1366,14 +1379,19 @@ public class TypeExtractorTest {
        
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
-       public void testTypeErasureException() {
+       public void testTypeErasure() {
+               TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(new 
DummyFlatMapFunction<String, Integer, String, Boolean>(), 
+                                       (TypeInformation) 
TypeInfoParser.parse("Tuple2<String, Integer>"), "name", true);
+               Assert.assertTrue(ti instanceof MissingTypeInfo);
+               
                try {
                        TypeExtractor.getFlatMapReturnTypes(new 
DummyFlatMapFunction<String, Integer, String, Boolean>(), 
                                        (TypeInformation) 
TypeInfoParser.parse("Tuple2<String, Integer>"));
-                       Assert.fail("exception expected");
+                       
+                       Assert.fail("Expected an exception");
                }
                catch (InvalidTypesException e) {
-                       // right
+                       // expected
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
index 16d22a6..9153d8d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -149,12 +150,55 @@ public class TypeInfoParserTest {
        }
        
        @Test
-       public void testCustomType() {
+       public void testGenericType() {
                TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class");
                Assert.assertTrue(ti instanceof GenericTypeInfo);
                Assert.assertEquals(Class.class, ((GenericTypeInfo<?>) 
ti).getTypeClass());
        }
        
+       public static class MyPojo {
+               public Integer basic;
+               public Tuple2<String, Integer> tuple;
+               public MyWritable hadoopCitizen;
+               public String[] array;
+       }
+       
+       @Test
+       public void testPojoType() {
+               TypeInformation<?> ti = TypeInfoParser.parse(
+                               
"org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<"
+                               + "basic=Integer,"
+                               + "tuple=Tuple2<String, Integer>,"
+                               + 
"hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyWritable>,"
+                               + "array=String[]"
+                               + ">");
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
+               PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
+               Assert.assertEquals("array", 
pti.getPojoFieldAt(0).field.getName());
+               Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof 
BasicArrayTypeInfo);
+               Assert.assertEquals("basic", 
pti.getPojoFieldAt(1).field.getName());
+               Assert.assertTrue(pti.getPojoFieldAt(1).type instanceof 
BasicTypeInfo);
+               Assert.assertEquals("hadoopCitizen", 
pti.getPojoFieldAt(2).field.getName());
+               Assert.assertTrue(pti.getPojoFieldAt(2).type instanceof 
WritableTypeInfo);
+               Assert.assertEquals("tuple", 
pti.getPojoFieldAt(3).field.getName());
+               Assert.assertTrue(pti.getPojoFieldAt(3).type instanceof 
TupleTypeInfo);
+       }
+       
+       @Test
+       public void testPojoType2() {
+               TypeInformation<?> ti = 
TypeInfoParser.parse("Tuple2<String,Tuple2<Integer,org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>>>");
+               Assert.assertTrue(ti instanceof TupleTypeInfo);
+               TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+               Assert.assertTrue(tti.getTypeAt(0) instanceof BasicTypeInfo);
+               Assert.assertTrue(tti.getTypeAt(1) instanceof TupleTypeInfo);
+               TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) tti.getTypeAt(1);
+               Assert.assertTrue(tti2.getTypeAt(0) instanceof BasicTypeInfo);
+               Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo);
+               PojoTypeInfo<?> pti = (PojoTypeInfo<?>) tti2.getTypeAt(1);
+               Assert.assertEquals("basic", 
pti.getPojoFieldAt(0).field.getName());
+               Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof 
BasicTypeInfo);
+       }
+       
        public static class MyWritable implements Writable {
 
                @Override
@@ -198,6 +242,19 @@ public class TypeInfoParserTest {
                Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple4<Double, 
ObjectArrayTypeInfo<GenericType<java.lang.Class>>, 
ValueType<org.apache.flink.types.StringValue>, Java Tuple1<Integer>>>", 
ti.toString());
        }
        
+       public static enum MyEnum {
+               ONE, TWO, THREE
+       }
+       
+       @Test
+       public void testEnumType() {
+               TypeInformation<?> ti = 
TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>");
+               
Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>",
 ti.toString());
+               
+               TypeInformation<?> ti2 = 
TypeInfoParser.parse("java.lang.Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>");
+               
Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>",
 ti2.toString());
+       }
+       
        @Test
        public void testException() {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index fa85f8c..49a3fe5 100644
--- 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.type.lambdas;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
+
 import junit.framework.Assert;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -39,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -54,23 +56,23 @@ public class LambdaExtractionTest {
                                @Override
                                public Integer map(String value) { return 
Integer.parseInt(value); }
                        };
-                       
+
                        MapFunction<?, ?> anonymousFromClass = new 
RichMapFunction<String, Integer>() {
                                @Override
                                public Integer map(String value) { return 
Integer.parseInt(value); }
                        };
-                       
+
                        MapFunction<?, ?> fromProperClass = new StaticMapper();
-                       
+
                        MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
                                @Override
                                public Tuple2<Integer, Long> map(Integer value) 
{
                                        return new Tuple2<Integer, Long>(value, 
1L);
                                }
                        };
-                       
+
                        MapFunction<String, Integer> lambda = (str) -> 
Integer.parseInt(str);
-                       
+
                        
assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface));
                        
assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass));
                        
assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass));
@@ -83,158 +85,168 @@ public class LambdaExtractionTest {
                        fail(e.getMessage());
                }
        }
-       
-       public static class StaticMapper implements MapFunction<String, 
Integer> {
 
+       public static class StaticMapper implements MapFunction<String, 
Integer> {
                @Override
                public Integer map(String value) { return 
Integer.parseInt(value); }
        }
-       
-       public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
 
+       public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
                @Override
                public Tuple2<T, Long> map(T value) throws Exception;
        }
-       
+
        private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) 
-> Integer.parseInt(str);
-       
+
        public static class MyClass {
                private String s = "mystring";
-               
+
                public MapFunction<Integer, String> getMapFunction() {
                        return (i) -> s;
                }
        }
-       
+
        @Test
-       public void testLambdaWithMemberVariable() {            
+       public void testLambdaWithMemberVariable() {
                TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new 
MyClass().getMapFunction(), TypeInfoParser.parse("Integer"));
                Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
        }
-       
+
        @Test
        public void testLambdaWithLocalVariable() {
                String s = "mystring";
                final int k = 24;
                int j = 26;
-               
+
                MapFunction<Integer, String> f = (i) -> s + k + j;
-               
+
                TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
TypeInfoParser.parse("Integer"));
                Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
        }
-       
+
        @Test
        public void testMapLambda() {
                MapFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-               
+
                TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @Test
        public void testFlatMapLambda() {
                FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-               
+
                TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @Test
        public void testMapPartitionLambda() {
                MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-               
+
                TypeInformation<?> ti = 
TypeExtractor.getMapPartitionReturnTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @Test
        public void testGroupReduceLambda() {
                GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
-               
+
                TypeInformation<?> ti = 
TypeExtractor.getGroupReduceReturnTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @Test
        public void testFlatJoinLambda() {
                FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, 
o) -> {};
-               
+
                TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @Test
        public void testJoinLambda() {
                JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) 
-> null;
-               
+
                TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @Test
        public void testCoGroupLambda() {
                CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, 
o) -> {};
-               
+
                TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @Test
        public void testCrossLambda() {
                CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) 
-> null;
-               
+
                TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @Test
        public void testKeySelectorLambda() {
                KeySelector<Tuple2<Tuple1<Integer>, Boolean>, 
Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
-               
+
                TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, 
TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
-               Assert.assertTrue(ti.isTupleType());
-               Assert.assertEquals(2, ti.getArity());
-               Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
-               Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), 
BasicTypeInfo.STRING_TYPE_INFO);
+               if (!(ti instanceof MissingTypeInfo)) {
+                       Assert.assertTrue(ti.isTupleType());
+                       Assert.assertEquals(2, ti.getArity());
+                       Assert.assertTrue(((TupleTypeInfo<?>) 
ti).getTypeAt(0).isTupleType());
+                       Assert.assertEquals(((TupleTypeInfo<?>) 
ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+               }
        }
-       
+
        @SuppressWarnings("rawtypes")
        @Test
-       public void testLambdaTypeErasureException() {
+       public void testLambdaTypeErasure() {
                MapFunction<Tuple1, Tuple1> f = (i) -> null;
-               
-               try {
-                       TypeExtractor.getMapReturnTypes(f, 
TypeInfoParser.parse("Tuple1<String>"));
-                       Assert.fail();
-               }
-               catch (InvalidTypesException e) {
-                       // ok
-               }
+               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, 
TypeInfoParser.parse("Tuple1<String>"), null, true);
+               Assert.assertTrue(ti instanceof MissingTypeInfo);
        }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
new file mode 100644
index 0000000..350227a
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TypeHintITCase extends JavaProgramTestBase {
+
+       private static int NUM_PROGRAMS = 3;
+
+       private int curProgId = config.getInteger("ProgramId", -1);
+       private String resultPath;
+       private String expectedResult;
+
+       public TypeHintITCase(Configuration config) {
+               super(config);
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+       @Override
+       protected void testProgram() throws Exception {
+               expectedResult = TypeHintProgs.runProgram(curProgId, 
resultPath);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+
+               for(int i=1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
+               }
+
+               return toParameterList(tConfigs);
+       }
+
+       private static class TypeHintProgs {
+
+               public static String runProgram(int progId, String resultPath) 
throws Exception {
+                       switch(progId) {
+                       // Test identity map with missing types and string type 
hint
+                       case 1: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Tuple3<Integer, Long, String>> 
identityMapDs = ds
+                                               .map(new Mapper<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>>())
+                                               .returns("Tuple3<Integer, Long, 
String>");
+                               identityMapDs.writeAsText(resultPath);
+                               env.execute();
+
+                               // return expected result
+                               return "(2,2,Hello)\n" +
+                               "(3,2,Hello world)\n" +
+                               "(1,1,Hi)\n";
+                       }
+                       // Test identity map with missing types and type 
information type hint
+                       case 2: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               DataSet<Tuple3<Integer, Long, String>> 
identityMapDs = ds
+                                               // all following generics get 
erased during compilation
+                                               .map(new Mapper<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>>())
+                                               .returns(new 
TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+                               identityMapDs.writeAsText(resultPath);
+                               env.execute();
+
+                               // return expected result
+                               return "(2,2,Hello)\n" +
+                               "(3,2,Hello world)\n" +
+                               "(1,1,Hi)\n";
+                       }
+                       // Test flat map with class type hint
+                       case 3: {
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+                               @SuppressWarnings({ "rawtypes", "unchecked" })
+                               DataSet<Integer> identityMapDs = ds.
+                               flatMap(new FlatMapper<Tuple3<Integer, Long, 
String>, Integer>())
+                               .returns((Class) Integer.class);
+                               identityMapDs.writeAsText(resultPath);
+                               env.execute();
+
+                               // return expected result
+                               return "2\n" +
+                               "3\n" +
+                               "1\n";
+                       }
+                       default: 
+                               throw new IllegalArgumentException("Invalid 
program id");
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class Mapper<T, V> implements MapFunction<T, V> {
+               private static final long serialVersionUID = 1L;
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public V map(T value) throws Exception {
+                       return (V) value;
+               }
+       }
+       
+       public static class FlatMapper<T, V> implements FlatMapFunction<T, V> {
+               private static final long serialVersionUID = 1L;
+
+               @SuppressWarnings({ "unchecked", "rawtypes" })
+               @Override
+               public void flatMap(T value, Collector<V> out) throws Exception 
{
+                       out.collect((V) ((Tuple3)value).f0);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
index 9c255f3..ab90757 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/ScalaAPICompletenessTest.scala
@@ -57,6 +57,10 @@ class ScalaAPICompletenessTest {
       "org.apache.flink.api.java.ExecutionEnvironment.localExecutionIsAllowed",
       
"org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism",
 
+      // TypeHints are only needed for Java API, Scala API doesn't need them
+      "org.apache.flink.api.java.operators.SingleInputUdfOperator.returns",
+      "org.apache.flink.api.java.operators.TwoInputUdfOperator.returns",
+
       // This is really just a mapper, which in Scala can easily expressed as 
a map lambda
       "org.apache.flink.api.java.DataSet.writeAsFormattedText",
 

Reply via email to