Repository: flink Updated Branches: refs/heads/release-1.0 43e5975d5 -> 23dc2a4ac
[FLINK-2788] [apis] Add TypeHint class to allow type-safe generic type parsing This closes #1744 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23dc2a4a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23dc2a4a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23dc2a4a Branch: refs/heads/release-1.0 Commit: 23dc2a4acf8e886384a66587ff393c2e62a69037 Parents: 43e5975 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 29 19:24:34 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 1 17:24:41 2016 +0100 ---------------------------------------------------------------------- .../flink/api/common/typeinfo/TypeHint.java | 77 ++++++++ .../api/common/typeinfo/TypeInformation.java | 40 ++++- .../flink/api/common/typeinfo/TypeHintTest.java | 62 +++++++ .../java/org/apache/flink/api/java/DataSet.java | 1 - .../java/operators/SingleInputUdfOperator.java | 176 ++++++++++--------- .../api/java/operators/TwoInputUdfOperator.java | 176 +++++++++++-------- .../datastream/SingleOutputStreamOperator.java | 156 ++++++++-------- .../flink/streaming/api/TypeFillTest.java | 48 +++-- 8 files changed, 479 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java new file mode 100644 index 0000000..975d6e3 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +/** + * A utility class for describing generic types. It can be used to obtain a type information via: + * + * <pre>{@code + * TypeInformation<Tuple2<String, Long>> info = TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}); + * }</pre> + * or + * <pre>{@code + * TypeInformation<Tuple2<String, Long>> info = new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo(); + * }</pre> + * + * @param <T> The type information to hint. + */ +@Public +public abstract class TypeHint<T> { + + /** The type information described by the hint */ + private final TypeInformation<T> typeInfo; + + /** + * Creates a hint for the generic type in the class signature. + */ + public TypeHint() { + this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0); + } + + // ------------------------------------------------------------------------ + + /** + * Gets the type information described by this TypeHint. + * @return The type information described by this TypeHint. + */ + public TypeInformation<T> getTypeInfo() { + return typeInfo; + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return typeInfo.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj == this || + obj instanceof TypeHint && this.typeInfo.equals(((TypeHint<?>) obj).typeInfo); + } + + @Override + public String toString() { + return "TypeHint: " + typeInfo; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java index 1c95be0..95eed6b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java @@ -22,9 +22,10 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; import java.io.Serializable; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; /** @@ -128,7 +129,7 @@ public abstract class TypeInformation<T> implements Serializable { @PublicEvolving public List<TypeInformation<?>> getGenericParameters() { // Return an empty list as the default implementation - return new LinkedList<>(); + return Collections.emptyList(); } /** @@ -175,4 +176,39 @@ public abstract class TypeInformation<T> implements Serializable { * @return true if obj can be equaled with this, otherwise false */ public abstract boolean canEqual(Object obj); + + // ------------------------------------------------------------------------ + + /** + * Creates a TypeInformation for the type described by the given class. + * + * <p>This method only works for non-generic types. For generic types, use the + * {@link #of(TypeHint)} method. + * + * @param typeClass The class of the type. + * @param <T> The generic type. + * + * @return The TypeInformation object for the type described by the hint. + */ + public static <T> TypeInformation<T> of(Class<T> typeClass) { + return TypeExtractor.createTypeInfo(typeClass); + } + + /** + * Creates a TypeInformation for a generic type via a utility "type hint". + * This method can be used as follows: + * <pre> + * {@code + * TypeInformation<Tuple2<String, Long>> info = TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}); + * } + * </pre> + * + * @param typeHint The hint for the generic type. + * @param <T> The generic type. + * + * @return The TypeInformation object for the type described by the hint. + */ + public static <T> TypeInformation<T> of(TypeHint<T> typeHint) { + return typeHint.getTypeInfo(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java new file mode 100644 index 0000000..60232f2 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TypeHintTest { + + @Test + public void testTypeInfoDirect() { + + // simple (non-generic case) + TypeHint<String> stringInfo1 = new TypeHint<String>(){}; + TypeHint<String> stringInfo2 = new TypeHint<String>(){}; + + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, stringInfo1.getTypeInfo()); + + assertTrue(stringInfo1.hashCode() == stringInfo2.hashCode()); + assertTrue(stringInfo1.equals(stringInfo2)); + assertTrue(stringInfo1.toString().equals(stringInfo2.toString())); + + // generic case + TypeHint<Tuple3<String, Double, Boolean>> generic = new TypeHint<Tuple3<String, Double, Boolean>>(){}; + + TypeInformation<Tuple3<String, Double, Boolean>> tupleInfo = + new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); + + assertEquals(tupleInfo, generic.getTypeInfo()); + } + + @Test + public void testTypeInfoOf() { + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(String.class)); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<String>(){})); + + + TypeInformation<Tuple3<String, Double, Boolean>> tupleInfo = + new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); + + assertEquals(tupleInfo, TypeInformation.of(new TypeHint<Tuple3<String, Double, Boolean>>(){})); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index c315920..b186c3c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -104,7 +104,6 @@ import java.util.List; * * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet. */ - @Public public abstract class DataSet<T> { http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java index 22cf089..eb485fe 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java @@ -26,18 +26,21 @@ import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import static java.util.Objects.requireNonNull; + /** * The <tt>SingleInputUdfOperator</tt> is the base class of all unary operators that execute * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that @@ -185,6 +188,92 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp O returnType = (O) this; return returnType; } + + // ------------------------------------------------------------------------ + // type hinting + // ------------------------------------------------------------------------ + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>Classes can be used as type hints for non-generic types (classes without generic parameters), + * but not for generic types like for example Tuples. For those generic types, please + * use the {@link #returns(TypeHint)} method. + * + * <p>Use this method the following way: + * <pre>{@code + * DataSet<String[]> result = + * data.flatMap(new FunctionWithNonInferrableReturnType()) + * .returns(String[].class); + * }</pre> + * + * @param typeClass The class of the returned data type. + * @return This operator with the type information corresponding to the given type class. + */ + public O returns(Class<OUT> typeClass) { + requireNonNull(typeClass, "type class must not be null"); + + try { + return returns(TypeInformation.of(typeClass)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the class alone." + + "This is most likely because the class represents a generic type. In that case," + + "please use the 'returns(TypeHint)' method instead.", e); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>Use this method the following way: + * <pre>{@code + * DataSet<Tuple2<String, Double>> result = + * data.flatMap(new FunctionWithNonInferrableReturnType()) + * .returns(new TypeHint<Tuple2<String, Double>>(){}); + * }</pre> + * + * @param typeHint The type hint for the returned data type. + * @return This operator with the type information corresponding to the given type hint. + */ + public O returns(TypeHint<OUT> typeHint) { + requireNonNull(typeHint, "TypeHint must not be null"); + + try { + return returns(TypeInformation.of(typeHint)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the type hint. " + + "Make sure that the TypeHint does not use any generic type variables."); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} + * are preferable. + * + * @param typeInfo The type information for the returned data type. + * @return This operator using the given type information for the return type. + */ + public O returns(TypeInformation<OUT> typeInfo) { + requireNonNull(typeInfo, "TypeInformation must not be null"); + + fillInType(typeInfo); + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } /** * Adds a type information hint about the return type of this operator. @@ -220,7 +309,11 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp * @param typeInfoString * type information string to be parsed * @return This operator with a given return type hint. + * + * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead. */ + @Deprecated + @PublicEvolving public O returns(String typeInfoString) { if (typeInfoString == null) { throw new IllegalArgumentException("Type information string must not be null."); @@ -228,78 +321,6 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp return returns(TypeInfoParser.<OUT>parse(typeInfoString)); } - /** - * Adds a type information hint about the return type of this operator. - * - * <p> - * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - * <p> - * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as: - * - * <ul> - * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li> - * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li> - * <li>etc.</li> - * </ul> - * - * @param typeInfo - * type information as a return type hint - * @return This operator with a given return type hint. - */ - public O returns(TypeInformation<OUT> typeInfo) { - if (typeInfo == null) { - throw new IllegalArgumentException("Type information must not be null."); - } - fillInType(typeInfo); - @SuppressWarnings("unchecked") - O returnType = (O) this; - return returnType; - } - - /** - * Adds a type information hint about the return type of this operator. - * - * <p> - * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - * <p> - * This method takes a class that will be analyzed by Flink's type extraction capabilities. - * - * <p> - * Examples for classes are: - * <ul> - * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li> - * <li>POJOs such as <code>MyPojo.class</code></li> - * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li> - * <li>Arrays such as <code>String[].class</code>, etc.</li> - * </ul> - * - * @param typeClass - * class as a return type hint - * @return This operator with a given return type hint. - */ - @SuppressWarnings("unchecked") - public O returns(Class<OUT> typeClass) { - if (typeClass == null) { - throw new IllegalArgumentException("Type class must not be null."); - } - - try { - TypeInformation<OUT> ti = (TypeInformation<OUT>) TypeExtractor.createTypeInfo(typeClass); - return returns(ti); - } - catch (InvalidTypesException e) { - throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e); - } - } - // -------------------------------------------------------------------------------------------- // Accessors // -------------------------------------------------------------------------------------------- @@ -360,12 +381,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp } protected boolean udfWithForwardedFieldsAnnotation(Class<?> udfClass) { - - if (udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null || - udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null) { - return true; - } else { - return false; - } + return udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null || + udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java index 34a5518..695ed3a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java @@ -26,18 +26,21 @@ import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import static java.util.Objects.requireNonNull; + /** * The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators that execute * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that @@ -262,14 +265,102 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp O returnType = (O) this; return returnType; } - + + // ------------------------------------------------------------------------ + // type hinting + // ------------------------------------------------------------------------ + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>Classes can be used as type hints for non-generic types (classes without generic parameters), + * but not for generic types like for example Tuples. For those generic types, please + * use the {@link #returns(TypeHint)} method. + * + * <p>Use this method the following way: + * <pre>{@code + * DataSet<String[]> result = + * data1.join(data2).where("id").equalTo("fieldX") + * .with(new JoinFunctionWithNonInferrableReturnType()) + * .returns(String[].class); + * }</pre> + * + * @param typeClass The class of the returned data type. + * @return This operator with the type information corresponding to the given type class. + */ + public O returns(Class<OUT> typeClass) { + requireNonNull(typeClass, "type class must not be null"); + + try { + return returns(TypeInformation.of(typeClass)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the class alone." + + "This is most likely because the class represents a generic type. In that case," + + "please use the 'returns(TypeHint)' method instead.", e); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>Use this method the following way: + * <pre>{@code + * DataSet<Tuple2<String, Double>> result = + * data1.join(data2).where("id").equalTo("fieldX") + * .with(new JoinFunctionWithNonInferrableReturnType()) + * .returns(new TypeHint<Tuple2<String, Double>>(){}); + * }</pre> + * + * @param typeHint The type hint for the returned data type. + * @return This operator with the type information corresponding to the given type hint. + */ + public O returns(TypeHint<OUT> typeHint) { + requireNonNull(typeHint, "TypeHint must not be null"); + + try { + return returns(TypeInformation.of(typeHint)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the type hint. " + + "Make sure that the TypeHint does not use any generic type variables."); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} + * are preferable. + * + * @param typeInfo The type information for the returned data type. + * @return This operator using the given type information for the return type. + */ + public O returns(TypeInformation<OUT> typeInfo) { + requireNonNull(typeInfo, "TypeInformation must not be null"); + + fillInType(typeInfo); + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } + /** * Adds a type information hint about the return type of this operator. - * + * * <p> * Type hints are important in cases where the Java compiler * throws away generic type information necessary for efficient execution. - * + * * <p> * This method takes a type information string that will be parsed. A type information string can contain the following * types: @@ -297,86 +388,17 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp * @param typeInfoString * type information string to be parsed * @return This operator with a given return type hint. + * + * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead. */ + @Deprecated + @PublicEvolving public O returns(String typeInfoString) { if (typeInfoString == null) { throw new IllegalArgumentException("Type information string must not be null."); } return returns(TypeInfoParser.<OUT>parse(typeInfoString)); } - - /** - * Adds a type information hint about the return type of this operator. - * - * <p> - * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - * <p> - * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as: - * - * <ul> - * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li> - * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li> - * <li>etc.</li> - * </ul> - * - * @param typeInfo - * type information as a return type hint - * @return This operator with a given return type hint. - */ - public O returns(TypeInformation<OUT> typeInfo) { - if (typeInfo == null) { - throw new IllegalArgumentException("Type information must not be null."); - } - fillInType(typeInfo); - - @SuppressWarnings("unchecked") - O returnType = (O) this; - return returnType; - } - - /** - * Adds a type information hint about the return type of this operator. - * - * <p> - * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - * <p> - * This method takes a class that will be analyzed by Flink's type extraction capabilities. - * - * <p> - * Examples for classes are: - * <ul> - * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li> - * <li>POJOs such as <code>MyPojo.class</code></li> - * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li> - * <li>Arrays such as <code>String[].class</code>, etc.</li> - * </ul> - * - * @param typeClass - * class as a return type hint - * @return This operator with a given return type hint. - */ - @SuppressWarnings("unchecked") - public O returns(Class<OUT> typeClass) { - if (typeClass == null) { - throw new IllegalArgumentException("Type class must not be null."); - } - - try { - TypeInformation<OUT> ti = (TypeInformation<OUT>) TypeExtractor.createTypeInfo(typeClass); - return returns(ti); - } - catch (InvalidTypesException e) { - throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e); - } - } // -------------------------------------------------------------------------------------------- // Accessors http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 5846c26..2c7b5cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import static java.util.Objects.requireNonNull; + /** * {@code SingleOutputStreamOperator} represents a user defined transformation * applied on a {@link DataStream} with one predefined output type. @@ -152,6 +154,83 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { return setChainingStrategy(ChainingStrategy.HEAD); } + // ------------------------------------------------------------------------ + // Type hinting + // ------------------------------------------------------------------------ + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>Classes can be used as type hints for non-generic types (classes without generic parameters), + * but not for generic types like for example Tuples. For those generic types, please + * use the {@link #returns(TypeHint)} method. + * + * @param typeClass The class of the returned data type. + * @return This operator with the type information corresponding to the given type class. + */ + public SingleOutputStreamOperator<T> returns(Class<T> typeClass) { + requireNonNull(typeClass, "type class must not be null."); + + try { + return returns(TypeInformation.of(typeClass)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the class alone." + + "This is most likely because the class represents a generic type. In that case," + + "please use the 'returns(TypeHint)' method instead."); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>Use this method the following way: + * <pre>{@code + * DataStream<Tuple2<String, Double>> result = + * stream.flatMap(new FunctionWithNonInferrableReturnType()) + * .returns(new TypeHint<Tuple2<String, Double>>(){}); + * }</pre> + * + * @param typeHint The type hint for the returned data type. + * @return This operator with the type information corresponding to the given type hint. + */ + public SingleOutputStreamOperator<T> returns(TypeHint<T> typeHint) { + requireNonNull(typeHint, "TypeHint must not be null"); + + try { + return returns(TypeInformation.of(typeHint)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the type hint. " + + "Make sure that the TypeHint does not use any generic type variables."); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + * <p>In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} + * are preferable. + * + * @param typeInfo type information as a return type hint + * @return This operator with a given return type hint. + */ + public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo) { + requireNonNull(typeInfo, "TypeInformation must not be null"); + + transformation.setOutputType(typeInfo); + return this; + } + /** * Adds a type information hint about the return type of this operator. * @@ -186,7 +265,11 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { * @param typeInfoString * type information string to be parsed * @return This operator with a given return type hint. + * + * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead. */ + @Deprecated + @PublicEvolving public SingleOutputStreamOperator<T> returns(String typeInfoString) { if (typeInfoString == null) { throw new IllegalArgumentException("Type information string must not be null."); @@ -194,74 +277,9 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { return returns(TypeInfoParser.<T>parse(typeInfoString)); } - /** - * Adds a type information hint about the return type of this operator. - * - * <p> - * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - * <p> - * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as: - * - * <ul> - * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li> - * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li> - * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li> - * <li>etc.</li> - * </ul> - * - * @param typeInfo type information as a return type hint - * @return This operator with a given return type hint. - */ - public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo) { - if (typeInfo == null) { - throw new IllegalArgumentException("Type information must not be null."); - } - transformation.setOutputType(typeInfo); - return this; - } - - /** - * Adds a type information hint about the return type of this operator. - * - * <p> - * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - * <p> - * This method takes a class that will be analyzed by Flink's type extraction capabilities. - * - * <p> - * Examples for classes are: - * <ul> - * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li> - * <li>POJOs such as <code>MyPojo.class</code></li> - * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li> - * <li>Arrays such as <code>String[].class</code>, etc.</li> - * </ul> - * - * @param typeClass - * class as a return type hint - * @return This operator with a given return type hint. - */ - @SuppressWarnings("unchecked") - public SingleOutputStreamOperator<T> returns(Class<T> typeClass) { - if (typeClass == null) { - throw new IllegalArgumentException("Type class must not be null."); - } - - try { - TypeInformation<T> ti = (TypeInformation<T>) TypeExtractor.createTypeInfo(typeClass); - return returns(ti); - } - catch (InvalidTypesException e) { - throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e); - } - } + // ------------------------------------------------------------------------ + // Miscellaneous + // ------------------------------------------------------------------------ @Override protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) { http://git-wip-us.apache.org/repos/asf/flink/blob/23dc2a4a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index ddda82d..acbb5b4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.fail; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -44,36 +45,35 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase { try { env.addSource(new TestSource<Integer>()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} + DataStream<Long> source = env.generateSequence(1, 10); try { source.map(new TestMap<Long, Long>()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} + try { source.flatMap(new TestFlatMap<Long, Long>()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} + try { source.connect(source).map(new TestCoMap<Long, Long, Integer>()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} + try { source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} - env.addSource(new TestSource<Integer>()).returns("Integer"); + env.addSource(new TestSource<Integer>()).returns(Integer.class); source.map(new TestMap<Long, Long>()).returns(Long.class).print(); - source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print(); - source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print(); + source.flatMap(new TestFlatMap<Long, Long>()).returns(new TypeHint<Long>(){}).print(); + source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns(BasicTypeInfo.INT_TYPE_INFO).print(); source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()) .returns(BasicTypeInfo.INT_TYPE_INFO).print(); @@ -90,10 +90,9 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase { map.print(); try { - map.returns("String"); + map.returns(String.class); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} } @@ -101,14 +100,10 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase { private static final long serialVersionUID = 1L; @Override - public void run(SourceContext<T> ctx) throws Exception { - - } + public void run(SourceContext<T> ctx) throws Exception {} @Override - public void cancel() { - - } + public void cancel() {} } private class TestMap<T, O> implements MapFunction<T, O> { @@ -120,8 +115,7 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase { private class TestFlatMap<T, O> implements FlatMapFunction<T, O> { @Override - public void flatMap(T value, Collector<O> out) throws Exception { - } + public void flatMap(T value, Collector<O> out) throws Exception {} } private class TestCoMap<IN1, IN2, OUT> implements CoMapFunction<IN1, IN2, OUT> { @@ -141,12 +135,10 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase { private class TestCoFlatMap<IN1, IN2, OUT> implements CoFlatMapFunction<IN1, IN2, OUT> { @Override - public void flatMap1(IN1 value, Collector<OUT> out) throws Exception { - } + public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {} @Override - public void flatMap2(IN2 value, Collector<OUT> out) throws Exception { - } + public void flatMap2(IN2 value, Collector<OUT> out) throws Exception {} } }