Repository: flink
Updated Branches:
  refs/heads/release-1.0 43e5975d5 -> 23dc2a4ac


[FLINK-2788] [apis] Add TypeHint class to allow type-safe generic type parsing

This closes #1744


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23dc2a4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23dc2a4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23dc2a4a

Branch: refs/heads/release-1.0
Commit: 23dc2a4acf8e886384a66587ff393c2e62a69037
Parents: 43e5975
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 29 19:24:34 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 1 17:24:41 2016 +0100

----------------------------------------------------------------------
 .../flink/api/common/typeinfo/TypeHint.java     |  77 ++++++++
 .../api/common/typeinfo/TypeInformation.java    |  40 ++++-
 .../flink/api/common/typeinfo/TypeHintTest.java |  62 +++++++
 .../java/org/apache/flink/api/java/DataSet.java |   1 -
 .../java/operators/SingleInputUdfOperator.java  | 176 ++++++++++---------
 .../api/java/operators/TwoInputUdfOperator.java | 176 +++++++++++--------
 .../datastream/SingleOutputStreamOperator.java  | 156 ++++++++--------
 .../flink/streaming/api/TypeFillTest.java       |  48 +++--
 8 files changed, 479 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
new file mode 100644
index 0000000..975d6e3
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * A utility class for describing generic types. It can be used to obtain a 
type information via:
+ * 
+ * <pre>{@code
+ * TypeInformation<Tuple2<String, Long>> info = TypeInformation.of(new 
TypeHint<Tuple2<String, Long>>(){});
+ * }</pre>
+ * or
+ * <pre>{@code
+ * TypeInformation<Tuple2<String, Long>> info = new TypeHint<Tuple2<String, 
Long>>(){}.getTypeInfo();
+ * }</pre>
+ * 
+ * @param <T> The type information to hint.
+ */
+@Public
+public abstract class TypeHint<T> {
+       
+       /** The type information described by the hint */
+       private final TypeInformation<T> typeInfo;
+
+       /**
+        * Creates a hint for the generic type in the class signature.
+        */
+       public TypeHint() {
+               this.typeInfo = TypeExtractor.createTypeInfo(this, 
TypeHint.class, getClass(), 0);
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the type information described by this TypeHint.
+        * @return The type information described by this TypeHint.
+        */
+       public TypeInformation<T> getTypeInfo() {
+               return typeInfo;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return typeInfo.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj == this || 
+                       obj instanceof TypeHint && 
this.typeInfo.equals(((TypeHint<?>) obj).typeInfo);
+       }
+
+       @Override
+       public String toString() {
+               return "TypeHint: " + typeInfo;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 1c95be0..95eed6b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -22,9 +22,10 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import java.io.Serializable;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -128,7 +129,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
        @PublicEvolving
        public List<TypeInformation<?>> getGenericParameters() {
                // Return an empty list as the default implementation
-               return new LinkedList<>();
+               return Collections.emptyList();
        }
 
        /**
@@ -175,4 +176,39 @@ public abstract class TypeInformation<T> implements 
Serializable {
         * @return true if obj can be equaled with this, otherwise false
         */
        public abstract boolean canEqual(Object obj);
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a TypeInformation for the type described by the given class.
+        * 
+        * <p>This method only works for non-generic types. For generic types, 
use the
+        * {@link #of(TypeHint)} method.
+        *
+        * @param typeClass The class of the type.
+        * @param <T> The generic type.
+        *
+        * @return The TypeInformation object for the type described by the 
hint.
+        */
+       public static <T> TypeInformation<T> of(Class<T> typeClass) {
+               return TypeExtractor.createTypeInfo(typeClass);
+       }
+       
+       /**
+        * Creates a TypeInformation for a generic type via a utility "type 
hint".
+        * This method can be used as follows:
+        * <pre>
+        * {@code
+        * TypeInformation<Tuple2<String, Long>> info = TypeInformation.of(new 
TypeHint<Tuple2<String, Long>>(){});
+        * }
+        * </pre>
+        * 
+        * @param typeHint The hint for the generic type.
+        * @param <T> The generic type.
+        *    
+        * @return The TypeInformation object for the type described by the 
hint.
+        */
+       public static <T> TypeInformation<T> of(TypeHint<T> typeHint) {
+               return typeHint.getTypeInfo();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java
new file mode 100644
index 0000000..60232f2
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TypeHintTest {
+       
+       @Test
+       public void testTypeInfoDirect() {
+               
+               // simple (non-generic case)
+               TypeHint<String> stringInfo1 = new TypeHint<String>(){};
+               TypeHint<String> stringInfo2 = new TypeHint<String>(){};
+               
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
stringInfo1.getTypeInfo());
+               
+               assertTrue(stringInfo1.hashCode() == stringInfo2.hashCode());
+               assertTrue(stringInfo1.equals(stringInfo2));
+               
assertTrue(stringInfo1.toString().equals(stringInfo2.toString()));
+
+               // generic case
+               TypeHint<Tuple3<String, Double, Boolean>> generic = new 
TypeHint<Tuple3<String, Double, Boolean>>(){};
+               
+               TypeInformation<Tuple3<String, Double, Boolean>> tupleInfo = 
+                               new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
BasicTypeInfo.BOOLEAN_TYPE_INFO);
+               
+               assertEquals(tupleInfo, generic.getTypeInfo());
+       }
+
+       @Test
+       public void testTypeInfoOf() {
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(String.class));
+               assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(new TypeHint<String>(){}));
+               
+
+               TypeInformation<Tuple3<String, Double, Boolean>> tupleInfo =
+                               new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+               assertEquals(tupleInfo, TypeInformation.of(new 
TypeHint<Tuple3<String, Double, Boolean>>(){}));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index c315920..b186c3c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -104,7 +104,6 @@ import java.util.List;
  *
  * @param <T> The type of the DataSet, i.e., the type of the elements of the 
DataSet.
  */
-
 @Public
 public abstract class DataSet<T> {
        

http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index 22cf089..eb485fe 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -26,18 +26,21 @@ import java.util.Set;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * The <tt>SingleInputUdfOperator</tt> is the base class of all unary 
operators that execute
  * user-defined functions (UDFs). The UDFs encapsulated by this operator are 
naturally UDFs that
@@ -185,6 +188,92 @@ public abstract class SingleInputUdfOperator<IN, OUT, O 
extends SingleInputUdfOp
                O returnType = (O) this;
                return returnType;
        }
+
+       // 
------------------------------------------------------------------------
+       //  type hinting
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        *
+        * <p>Classes can be used as type hints for non-generic types (classes 
without generic parameters),
+        * but not for generic types like for example Tuples. For those generic 
types, please
+        * use the {@link #returns(TypeHint)} method.
+        * 
+        * <p>Use this method the following way:
+        * <pre>{@code
+        *     DataSet<String[]> result =
+        *         data.flatMap(new FunctionWithNonInferrableReturnType())
+        *             .returns(String[].class);
+        * }</pre>
+        *
+        * @param typeClass The class of the returned data type.
+        * @return This operator with the type information corresponding to the 
given type class.
+        */
+       public O returns(Class<OUT> typeClass) {
+               requireNonNull(typeClass, "type class must not be null");
+               
+               try {
+                       return returns(TypeInformation.of(typeClass));
+               }
+               catch (InvalidTypesException e) {
+                       throw new InvalidTypesException("Cannot infer the type 
information from the class alone." +
+                                       "This is most likely because the class 
represents a generic type. In that case," +
+                                       "please use the 'returns(TypeHint)' 
method instead.", e);
+               }
+       }
+       
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        *
+        * <p>Use this method the following way:
+        * <pre>{@code
+        *     DataSet<Tuple2<String, Double>> result =
+        *         data.flatMap(new FunctionWithNonInferrableReturnType())
+        *             .returns(new TypeHint<Tuple2<String, Double>>(){});
+        * }</pre>
+        *
+        * @param typeHint The type hint for the returned data type.
+        * @return This operator with the type information corresponding to the 
given type hint.
+        */
+       public O returns(TypeHint<OUT> typeHint) {
+               requireNonNull(typeHint, "TypeHint must not be null");
+       
+               try {
+                       return returns(TypeInformation.of(typeHint));
+               }
+               catch (InvalidTypesException e) {
+                       throw new InvalidTypesException("Cannot infer the type 
information from the type hint. " +
+                                       "Make sure that the TypeHint does not 
use any generic type variables.");
+               }
+       }
+
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        * 
+        * <p>In most cases, the methods {@link #returns(Class)} and {@link 
#returns(TypeHint)}
+        * are preferable.
+        *
+        * @param typeInfo The type information for the returned data type.
+        * @return This operator using the given type information for the 
return type.
+        */
+       public O returns(TypeInformation<OUT> typeInfo) {
+               requireNonNull(typeInfo, "TypeInformation must not be null");
+               
+               fillInType(typeInfo);
+               @SuppressWarnings("unchecked")
+               O returnType = (O) this;
+               return returnType;
+       }
        
        /**
         * Adds a type information hint about the return type of this operator. 
@@ -220,7 +309,11 @@ public abstract class SingleInputUdfOperator<IN, OUT, O 
extends SingleInputUdfOp
         * @param typeInfoString
         *            type information string to be parsed
         * @return This operator with a given return type hint.
+        * 
+        * @deprecated Please use {@link #returns(Class)} or {@link 
#returns(TypeHint)} instead.
         */
+       @Deprecated
+       @PublicEvolving
        public O returns(String typeInfoString) {
                if (typeInfoString == null) {
                        throw new IllegalArgumentException("Type information 
string must not be null.");
@@ -228,78 +321,6 @@ public abstract class SingleInputUdfOperator<IN, OUT, O 
extends SingleInputUdfOp
                return returns(TypeInfoParser.<OUT>parse(typeInfoString));
        }
        
-       /**
-        * Adds a type information hint about the return type of this operator. 
-        * 
-        * <p>
-        * Type hints are important in cases where the Java compiler
-        * throws away generic type information necessary for efficient 
execution.
-        * 
-        * <p>
-        * This method takes an instance of {@link 
org.apache.flink.api.common.typeinfo.TypeInformation} such as:
-        * 
-        * <ul>
-        * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
-        * <li>{@link 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
-        * <li>etc.</li>
-        * </ul>
-        *
-        * @param typeInfo
-        *            type information as a return type hint
-        * @return This operator with a given return type hint.
-        */
-       public O returns(TypeInformation<OUT> typeInfo) {
-               if (typeInfo == null) {
-                       throw new IllegalArgumentException("Type information 
must not be null.");
-               }
-               fillInType(typeInfo);
-               @SuppressWarnings("unchecked")
-               O returnType = (O) this;
-               return returnType;
-       }
-       
-       /**
-        * Adds a type information hint about the return type of this operator. 
-        * 
-        * <p>
-        * Type hints are important in cases where the Java compiler
-        * throws away generic type information necessary for efficient 
execution.
-        * 
-        * <p>
-        * This method takes a class that will be analyzed by Flink's type 
extraction capabilities.
-        * 
-        * <p>
-        * Examples for classes are:
-        * <ul>
-        * <li>Basic types such as <code>Integer.class</code>, 
<code>String.class</code>, etc.</li>
-        * <li>POJOs such as <code>MyPojo.class</code></li>
-        * <li>Classes that <b>extend</b> tuples. Classes like 
<code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> 
sufficient.</li>
-        * <li>Arrays such as <code>String[].class</code>, etc.</li>
-        * </ul>
-        *
-        * @param typeClass
-        *            class as a return type hint
-        * @return This operator with a given return type hint.
-        */
-       @SuppressWarnings("unchecked")
-       public O returns(Class<OUT> typeClass) {
-               if (typeClass == null) {
-                       throw new IllegalArgumentException("Type class must not 
be null.");
-               }
-               
-               try {
-                       TypeInformation<OUT> ti = (TypeInformation<OUT>) 
TypeExtractor.createTypeInfo(typeClass);
-                       return returns(ti);
-               }
-               catch (InvalidTypesException e) {
-                       throw new InvalidTypesException("The given class is not 
suited for providing necessary type information.", e);
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
        // Accessors
        // 
--------------------------------------------------------------------------------------------
@@ -360,12 +381,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O 
extends SingleInputUdfOp
        }
 
        protected boolean udfWithForwardedFieldsAnnotation(Class<?> udfClass) {
-
-               if 
(udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null ||
-                               
udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null) {
-                       return true;
-               } else {
-                       return false;
-               }
+               return 
udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null ||
+                               
udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index 34a5518..695ed3a 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -26,18 +26,21 @@ import java.util.Set;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators 
that execute
  * user-defined functions (UDFs). The UDFs encapsulated by this operator are 
naturally UDFs that
@@ -262,14 +265,102 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, 
O extends TwoInputUdfOp
                O returnType = (O) this;
                return returnType;
        }
-       
+
+       // 
------------------------------------------------------------------------
+       //  type hinting
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        *
+        * <p>Classes can be used as type hints for non-generic types (classes 
without generic parameters),
+        * but not for generic types like for example Tuples. For those generic 
types, please
+        * use the {@link #returns(TypeHint)} method.
+        *
+        * <p>Use this method the following way:
+        * <pre>{@code
+        *     DataSet<String[]> result = 
+        *         data1.join(data2).where("id").equalTo("fieldX")
+        *              .with(new JoinFunctionWithNonInferrableReturnType())
+        *              .returns(String[].class);
+        * }</pre>
+        *
+        * @param typeClass The class of the returned data type.
+        * @return This operator with the type information corresponding to the 
given type class.
+        */
+       public O returns(Class<OUT> typeClass) {
+               requireNonNull(typeClass, "type class must not be null");
+
+               try {
+                       return returns(TypeInformation.of(typeClass));
+               }
+               catch (InvalidTypesException e) {
+                       throw new InvalidTypesException("Cannot infer the type 
information from the class alone." +
+                                       "This is most likely because the class 
represents a generic type. In that case," +
+                                       "please use the 'returns(TypeHint)' 
method instead.", e);
+               }
+       }
+
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        *
+        * <p>Use this method the following way:
+        * <pre>{@code
+        *     DataSet<Tuple2<String, Double>> result = 
+        *         data1.join(data2).where("id").equalTo("fieldX")
+        *              .with(new JoinFunctionWithNonInferrableReturnType())
+        *              .returns(new TypeHint<Tuple2<String, Double>>(){});
+        * }</pre>
+        *
+        * @param typeHint The type hint for the returned data type.
+        * @return This operator with the type information corresponding to the 
given type hint.
+        */
+       public O returns(TypeHint<OUT> typeHint) {
+               requireNonNull(typeHint, "TypeHint must not be null");
+
+               try {
+                       return returns(TypeInformation.of(typeHint));
+               }
+               catch (InvalidTypesException e) {
+                       throw new InvalidTypesException("Cannot infer the type 
information from the type hint. " +
+                                       "Make sure that the TypeHint does not 
use any generic type variables.");
+               }
+       }
+
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        *
+        * <p>In most cases, the methods {@link #returns(Class)} and {@link 
#returns(TypeHint)}
+        * are preferable.
+        *
+        * @param typeInfo The type information for the returned data type.
+        * @return This operator using the given type information for the 
return type.
+        */
+       public O returns(TypeInformation<OUT> typeInfo) {
+               requireNonNull(typeInfo, "TypeInformation must not be null");
+
+               fillInType(typeInfo);
+               @SuppressWarnings("unchecked")
+               O returnType = (O) this;
+               return returnType;
+       }
+
        /**
         * Adds a type information hint about the return type of this operator. 
-        * 
+        *
         * <p>
         * Type hints are important in cases where the Java compiler
         * throws away generic type information necessary for efficient 
execution.
-        * 
+        *
         * <p>
         * This method takes a type information string that will be parsed. A 
type information string can contain the following
         * types:
@@ -297,86 +388,17 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, 
O extends TwoInputUdfOp
         * @param typeInfoString
         *            type information string to be parsed
         * @return This operator with a given return type hint.
+        *
+        * @deprecated Please use {@link #returns(Class)} or {@link 
#returns(TypeHint)} instead.
         */
+       @Deprecated
+       @PublicEvolving
        public O returns(String typeInfoString) {
                if (typeInfoString == null) {
                        throw new IllegalArgumentException("Type information 
string must not be null.");
                }
                return returns(TypeInfoParser.<OUT>parse(typeInfoString));
        }
-       
-       /**
-        * Adds a type information hint about the return type of this operator. 
-        * 
-        * <p>
-        * Type hints are important in cases where the Java compiler
-        * throws away generic type information necessary for efficient 
execution.
-        * 
-        * <p>
-        * This method takes an instance of {@link 
org.apache.flink.api.common.typeinfo.TypeInformation} such as:
-        * 
-        * <ul>
-        * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
-        * <li>{@link 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
-        * <li>etc.</li>
-        * </ul>
-        *
-        * @param typeInfo
-        *            type information as a return type hint
-        * @return This operator with a given return type hint.
-        */
-       public O returns(TypeInformation<OUT> typeInfo) {
-               if (typeInfo == null) {
-                       throw new IllegalArgumentException("Type information 
must not be null.");
-               }
-               fillInType(typeInfo);
-               
-               @SuppressWarnings("unchecked")
-               O returnType = (O) this;
-               return returnType;
-       }
-       
-       /**
-        * Adds a type information hint about the return type of this operator. 
-        * 
-        * <p>
-        * Type hints are important in cases where the Java compiler
-        * throws away generic type information necessary for efficient 
execution.
-        * 
-        * <p>
-        * This method takes a class that will be analyzed by Flink's type 
extraction capabilities.
-        * 
-        * <p>
-        * Examples for classes are:
-        * <ul>
-        * <li>Basic types such as <code>Integer.class</code>, 
<code>String.class</code>, etc.</li>
-        * <li>POJOs such as <code>MyPojo.class</code></li>
-        * <li>Classes that <b>extend</b> tuples. Classes like 
<code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> 
sufficient.</li>
-        * <li>Arrays such as <code>String[].class</code>, etc.</li>
-        * </ul>
-        *
-        * @param typeClass
-        *            class as a return type hint
-        * @return This operator with a given return type hint.
-        */
-       @SuppressWarnings("unchecked")
-       public O returns(Class<OUT> typeClass) {
-               if (typeClass == null) {
-                       throw new IllegalArgumentException("Type class must not 
be null.");
-               }
-               
-               try {
-                       TypeInformation<OUT> ti = (TypeInformation<OUT>) 
TypeExtractor.createTypeInfo(typeClass);
-                       return returns(ti);
-               }
-               catch (InvalidTypesException e) {
-                       throw new InvalidTypesException("The given class is not 
suited for providing necessary type information.", e);
-               }
-       }
 
        // 
--------------------------------------------------------------------------------------------
        // Accessors

http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 5846c26..2c7b5cb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -29,6 +29,8 @@ import 
org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * {@code SingleOutputStreamOperator} represents a user defined transformation
  * applied on a {@link DataStream} with one predefined output type.
@@ -152,6 +154,83 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
                return setChainingStrategy(ChainingStrategy.HEAD);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Type hinting
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        *
+        * <p>Classes can be used as type hints for non-generic types (classes 
without generic parameters),
+        * but not for generic types like for example Tuples. For those generic 
types, please
+        * use the {@link #returns(TypeHint)} method.
+        *
+        * @param typeClass The class of the returned data type.
+        * @return This operator with the type information corresponding to the 
given type class.
+        */
+       public SingleOutputStreamOperator<T> returns(Class<T> typeClass) {
+               requireNonNull(typeClass, "type class must not be null.");
+
+               try {
+                       return returns(TypeInformation.of(typeClass));
+               }
+               catch (InvalidTypesException e) {
+                       throw new InvalidTypesException("Cannot infer the type 
information from the class alone." +
+                                       "This is most likely because the class 
represents a generic type. In that case," +
+                                       "please use the 'returns(TypeHint)' 
method instead.");
+               }
+       }
+
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        *
+        * <p>Use this method the following way:
+        * <pre>{@code
+        *     DataStream<Tuple2<String, Double>> result = 
+        *         stream.flatMap(new FunctionWithNonInferrableReturnType())
+        *               .returns(new TypeHint<Tuple2<String, Double>>(){});
+        * }</pre>
+        *
+        * @param typeHint The type hint for the returned data type.
+        * @return This operator with the type information corresponding to the 
given type hint.
+        */
+       public SingleOutputStreamOperator<T> returns(TypeHint<T> typeHint) {
+               requireNonNull(typeHint, "TypeHint must not be null");
+
+               try {
+                       return returns(TypeInformation.of(typeHint));
+               }
+               catch (InvalidTypesException e) {
+                       throw new InvalidTypesException("Cannot infer the type 
information from the type hint. " +
+                                       "Make sure that the TypeHint does not 
use any generic type variables.");
+               }
+       }
+
+       /**
+        * Adds a type information hint about the return type of this operator. 
This method
+        * can be used in cases where Flink cannot determine automatically what 
the produced
+        * type of a function is. That can be the case if the function uses 
generic type variables
+        * in the return type that cannot be inferred from the input type.
+        *
+        * <p>In most cases, the methods {@link #returns(Class)} and {@link 
#returns(TypeHint)}
+        * are preferable.
+        *
+        * @param typeInfo type information as a return type hint
+        * @return This operator with a given return type hint.
+        */
+       public SingleOutputStreamOperator<T> returns(TypeInformation<T> 
typeInfo) {
+               requireNonNull(typeInfo, "TypeInformation must not be null");
+               
+               transformation.setOutputType(typeInfo);
+               return this;
+       }
+       
        /**
         * Adds a type information hint about the return type of this operator. 
         * 
@@ -186,7 +265,11 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
         * @param typeInfoString
         *            type information string to be parsed
         * @return This operator with a given return type hint.
+        * 
+        * @deprecated Please use {@link #returns(Class)} or {@link 
#returns(TypeHint)} instead.
         */
+       @Deprecated
+       @PublicEvolving
        public SingleOutputStreamOperator<T> returns(String typeInfoString) {
                if (typeInfoString == null) {
                        throw new IllegalArgumentException("Type information 
string must not be null.");
@@ -194,74 +277,9 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
                return returns(TypeInfoParser.<T>parse(typeInfoString));
        }
        
-       /**
-        * Adds a type information hint about the return type of this operator. 
-        * 
-        * <p>
-        * Type hints are important in cases where the Java compiler
-        * throws away generic type information necessary for efficient 
execution.
-        * 
-        * <p>
-        * This method takes an instance of {@link 
org.apache.flink.api.common.typeinfo.TypeInformation} such as:
-        * 
-        * <ul>
-        * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
-        * <li>{@link 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
-        * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
-        * <li>etc.</li>
-        * </ul>
-        *
-        * @param typeInfo type information as a return type hint
-        * @return This operator with a given return type hint.
-        */
-       public SingleOutputStreamOperator<T> returns(TypeInformation<T> 
typeInfo) {
-               if (typeInfo == null) {
-                       throw new IllegalArgumentException("Type information 
must not be null.");
-               }
-               transformation.setOutputType(typeInfo);
-               return this;
-       }
-       
-       /**
-        * Adds a type information hint about the return type of this operator. 
-        * 
-        * <p>
-        * Type hints are important in cases where the Java compiler
-        * throws away generic type information necessary for efficient 
execution.
-        * 
-        * <p>
-        * This method takes a class that will be analyzed by Flink's type 
extraction capabilities.
-        * 
-        * <p>
-        * Examples for classes are:
-        * <ul>
-        * <li>Basic types such as <code>Integer.class</code>, 
<code>String.class</code>, etc.</li>
-        * <li>POJOs such as <code>MyPojo.class</code></li>
-        * <li>Classes that <b>extend</b> tuples. Classes like 
<code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> 
sufficient.</li>
-        * <li>Arrays such as <code>String[].class</code>, etc.</li>
-        * </ul>
-        *
-        * @param typeClass
-        *            class as a return type hint
-        * @return This operator with a given return type hint.
-        */
-       @SuppressWarnings("unchecked")
-       public SingleOutputStreamOperator<T> returns(Class<T> typeClass) {
-               if (typeClass == null) {
-                       throw new IllegalArgumentException("Type class must not 
be null.");
-               }
-               
-               try {
-                       TypeInformation<T> ti = (TypeInformation<T>) 
TypeExtractor.createTypeInfo(typeClass);
-                       return returns(ti);
-               }
-               catch (InvalidTypesException e) {
-                       throw new InvalidTypesException("The given class is not 
suited for providing necessary type information.", e);
-               }
-       }
+       // 
------------------------------------------------------------------------
+       //  Miscellaneous
+       // 
------------------------------------------------------------------------
 
        @Override
        protected DataStream<T> setConnectionType(StreamPartitioner<T> 
partitioner) {

http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index ddda82d..acbb5b4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -44,36 +45,35 @@ public class TypeFillTest extends 
StreamingMultipleProgramsTestBase {
                try {
                        env.addSource(new TestSource<Integer>()).print();
                        fail();
-               } catch (Exception e) {
-               }
+               } catch (Exception ignored) {}
+               
 
                DataStream<Long> source = env.generateSequence(1, 10);
 
                try {
                        source.map(new TestMap<Long, Long>()).print();
                        fail();
-               } catch (Exception e) {
-               }
+               } catch (Exception ignored) {}
+               
                try {
                        source.flatMap(new TestFlatMap<Long, Long>()).print();
                        fail();
-               } catch (Exception e) {
-               }
+               } catch (Exception ignored) {}
+               
                try {
                        source.connect(source).map(new TestCoMap<Long, Long, 
Integer>()).print();
                        fail();
-               } catch (Exception e) {
-               }
+               } catch (Exception ignored) {}
+               
                try {
                        source.connect(source).flatMap(new TestCoFlatMap<Long, 
Long, Integer>()).print();
                        fail();
-               } catch (Exception e) {
-               }
+               } catch (Exception ignored) {}
 
-               env.addSource(new TestSource<Integer>()).returns("Integer");
+               env.addSource(new TestSource<Integer>()).returns(Integer.class);
                source.map(new TestMap<Long, 
Long>()).returns(Long.class).print();
-               source.flatMap(new TestFlatMap<Long, 
Long>()).returns("Long").print();
-               source.connect(source).map(new TestCoMap<Long, Long, 
Integer>()).returns("Integer").print();
+               source.flatMap(new TestFlatMap<Long, Long>()).returns(new 
TypeHint<Long>(){}).print();
+               source.connect(source).map(new TestCoMap<Long, Long, 
Integer>()).returns(BasicTypeInfo.INT_TYPE_INFO).print();
                source.connect(source).flatMap(new TestCoFlatMap<Long, Long, 
Integer>())
                                .returns(BasicTypeInfo.INT_TYPE_INFO).print();
                
@@ -90,10 +90,9 @@ public class TypeFillTest extends 
StreamingMultipleProgramsTestBase {
 
                map.print();
                try {
-                       map.returns("String");
+                       map.returns(String.class);
                        fail();
-               } catch (Exception e) {
-               }
+               } catch (Exception ignored) {}
 
        }
 
@@ -101,14 +100,10 @@ public class TypeFillTest extends 
StreamingMultipleProgramsTestBase {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void run(SourceContext<T> ctx) throws Exception {
-
-               }
+               public void run(SourceContext<T> ctx) throws Exception {}
 
                @Override
-               public void cancel() {
-
-               }
+               public void cancel() {}
        }
 
        private class TestMap<T, O> implements MapFunction<T, O> {
@@ -120,8 +115,7 @@ public class TypeFillTest extends 
StreamingMultipleProgramsTestBase {
 
        private class TestFlatMap<T, O> implements FlatMapFunction<T, O> {
                @Override
-               public void flatMap(T value, Collector<O> out) throws Exception 
{
-               }
+               public void flatMap(T value, Collector<O> out) throws Exception 
{}
        }
 
        private class TestCoMap<IN1, IN2, OUT> implements CoMapFunction<IN1, 
IN2, OUT> {
@@ -141,12 +135,10 @@ public class TypeFillTest extends 
StreamingMultipleProgramsTestBase {
        private class TestCoFlatMap<IN1, IN2, OUT> implements 
CoFlatMapFunction<IN1, IN2, OUT> {
 
                @Override
-               public void flatMap1(IN1 value, Collector<OUT> out) throws 
Exception {
-               }
+               public void flatMap1(IN1 value, Collector<OUT> out) throws 
Exception {}
 
                @Override
-               public void flatMap2(IN2 value, Collector<OUT> out) throws 
Exception {
-               }
+               public void flatMap2(IN2 value, Collector<OUT> out) throws 
Exception {}
 
        }
 }

Reply via email to