Repository: flink Updated Branches: refs/heads/master 7f99a0df6 -> e30066dbd
[FLINK-7452] [types] Add helper methods for all built-in Flink types to Types This closes #4612. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e30066db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e30066db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e30066db Branch: refs/heads/master Commit: e30066dbd1ebf3c5780df89d766554042c8345a7 Parents: 7f99a0d Author: twalthr <twal...@apache.org> Authored: Mon Aug 28 14:13:07 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Dec 19 09:33:29 2017 +0100 ---------------------------------------------------------------------- docs/dev/table/sql.md | 6 +- docs/dev/table/tableApi.md | 6 +- .../apache/flink/orc/OrcTableSourceTest.java | 2 +- .../apache/flink/api/common/typeinfo/Types.java | 419 ++++++++++++++++++- .../api/java/typeutils/TypeInfoParser.java | 6 +- .../org/apache/flink/table/api/Types.scala | 141 +++++-- .../CorrelateStringExpressionTest.scala | 3 +- .../flink/api/scala/typeutils/Types.scala | 388 +++++++++++++++++ 8 files changed, 907 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index e5de70a..04d6e84 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -821,9 +821,9 @@ The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internal | `Types.FLOAT` | `REAL, FLOAT` | `java.lang.Float` | | `Types.DOUBLE` | `DOUBLE` | `java.lang.Double` | | `Types.DECIMAL` | `DECIMAL` | `java.math.BigDecimal` | -| `Types.DATE` | `DATE` | `java.sql.Date` | -| `Types.TIME` | `TIME` | `java.sql.Time` | -| `Types.TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` | +| `Types.SQL_DATE` | `DATE` | `java.sql.Date` | +| `Types.SQL_TIME` | `TIME` | `java.sql.Time` | +| `Types.SQL_TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` | | `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` | | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` | | `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` | http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/docs/dev/table/tableApi.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 8b6fa72..1cf2a0c 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -1559,9 +1559,9 @@ The Table API is built on top of Flink's DataSet and DataStream APIs. Internally | `Types.FLOAT` | `REAL, FLOAT` | `java.lang.Float` | | `Types.DOUBLE` | `DOUBLE` | `java.lang.Double` | | `Types.DECIMAL` | `DECIMAL` | `java.math.BigDecimal` | -| `Types.DATE` | `DATE` | `java.sql.Date` | -| `Types.TIME` | `TIME` | `java.sql.Time` | -| `Types.TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` | +| `Types.SQL_DATE` | `DATE` | `java.sql.Date` | +| `Types.SQL_TIME` | `TIME` | `java.sql.Time` | +| `Types.SQL_TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` | | `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` | | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` | | `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` | http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java index 4e4be77..f65faf3 100644 --- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java @@ -108,7 +108,7 @@ public class OrcTableSourceTest { assertTrue(returnType instanceof RowTypeInfo); RowTypeInfo rowType = (RowTypeInfo) returnType; - RowTypeInfo expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()); + TypeInformation<Row> expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()); assertEquals(expected, rowType); } http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java index e19cdd8..9259064 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java @@ -19,56 +19,431 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * <p>In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or cases where automatic type inference results in an inefficient type. + * + * <p>Please note that the Scala API and Table API have dedicated Types classes. + * (See <code>org.apache.flink.api.scala.Types</code> and <code>org.apache.flink.table.api.Types</code>) + * + * <p>A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo<String> STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo<Boolean> BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo<Byte> BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo<Short> SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo<Integer> INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo<Long> LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo<Float> FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo<Double> DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo<BigDecimal> DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** + * Returns type information for {@link java.lang.Void}. Does not support a null value. + */ + public static final TypeInformation<Void> VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** + * Returns type information for {@link java.lang.String}. Supports a null value. + */ + public static final TypeInformation<String> STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** + * Returns type information for both a primitive <code>byte</code> and {@link java.lang.Byte}. + * Does not support a null value. + */ + public static final TypeInformation<Byte> BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** + * Returns type information for both a primitive <code>boolean</code> and {@link java.lang.Boolean}. + * Does not support a null value. + */ + public static final TypeInformation<Boolean> BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** + * Returns type information for both a primitive <code>short</code> and {@link java.lang.Short}. + * Does not support a null value. + */ + public static final TypeInformation<Short> SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** + * Returns type information for both a primitive <code>int</code> and {@link java.lang.Integer}. + * Does not support a null value. + */ + public static final TypeInformation<Integer> INT = BasicTypeInfo.INT_TYPE_INFO; + + /** + * Returns type information for both a primitive <code>long</code> and {@link java.lang.Long}. + * Does not support a null value. + */ + public static final TypeInformation<Long> LONG = BasicTypeInfo.LONG_TYPE_INFO; + + /** + * Returns type information for both a primitive <code>float</code> and {@link java.lang.Float}. + * Does not support a null value. + */ + public static final TypeInformation<Float> FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; + + /** + * Returns type information for both a primitive <code>double</code> and {@link java.lang.Double}. + * Does not support a null value. + */ + public static final TypeInformation<Double> DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final SqlTimeTypeInfo<Date> SQL_DATE = SqlTimeTypeInfo.DATE; - public static final SqlTimeTypeInfo<Time> SQL_TIME = SqlTimeTypeInfo.TIME; - public static final SqlTimeTypeInfo<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP; + /** + * Returns type information for both a primitive <code>char</code> and {@link java.lang.Character}. + * Does not support a null value. + */ + public static final TypeInformation<Character> CHAR = BasicTypeInfo.CHAR_TYPE_INFO; /** - * Generates a RowTypeInfo with fields of the given types. - * The fields have the default names (f0, f1, f2 ..). + * Returns type information for {@link java.math.BigDecimal}. Supports a null value. + */ + public static final TypeInformation<BigDecimal> BIG_DEC = BasicTypeInfo.BIG_DEC_TYPE_INFO; + + /** + * Returns type information for {@link java.math.BigInteger}. Supports a null value. + */ + public static final TypeInformation<BigInteger> BIG_INT = BasicTypeInfo.BIG_INT_TYPE_INFO; + + /** + * Returns type information for {@link java.sql.Date}. Supports a null value. + */ + public static final TypeInformation<Date> SQL_DATE = SqlTimeTypeInfo.DATE; + + /** + * Returns type information for {@link java.sql.Time}. Supports a null value. + */ + public static final TypeInformation<Time> SQL_TIME = SqlTimeTypeInfo.TIME; + + /** + * Returns type information for {@link java.sql.Timestamp}. Supports a null value. + */ + public static final TypeInformation<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP; + + /** + * Returns type information for {@link org.apache.flink.types.Row} with fields of the given types. + * A row itself must not be null. + * + * <p>A row is a fixed-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null regardless of the field's type. + * The type of row fields cannot be automatically inferred; therefore, it is required to provide + * type information whenever a row is used. + * + * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row instances + * must strictly adhere to the schema defined by the type info. * - * <p>This method is a shortcut to {@code new RowTypeInfo(types)}. + * <p>This method generates type information with fields of the given types; the fields have + * the default names (f0, f1, f2 ..). * * @param types The types of the row fields, e.g., Types.STRING, Types.INT */ - public static RowTypeInfo ROW(TypeInformation<?>... types) { + public static TypeInformation<Row> ROW(TypeInformation<?>... types) { return new RowTypeInfo(types); } /** - * Generates a RowTypeInfo with fields of the given types and with given names. - * + * Returns type information for {@link org.apache.flink.types.Row} with fields of the given types and + * with given names. A row must not be null. + * + * <p>A row is a fixed-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null independent of the field's type. + * The type of row fields cannot be automatically inferred; therefore, it is required to provide + * type information whenever a row is used. + * + * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row instances + * must strictly adhere to the schema defined by the type info. + * * <p>Example use: {@code ROW_NAMED(new String[]{"name", "number"}, Types.STRING, Types.INT)}. - * - * <p>This method is identical to {@code new RowTypeInfo(types, names)}. * * @param fieldNames array of field names * @param types array of field types */ - public static RowTypeInfo ROW_NAMED(String[] fieldNames, TypeInformation<?>... types) { + public static TypeInformation<Row> ROW_NAMED(String[] fieldNames, TypeInformation<?>... types) { return new RowTypeInfo(types, fieldNames); } + + /** + * Returns type information for subclasses of Flink's {@link org.apache.flink.api.java.tuple.Tuple} + * (namely {@link org.apache.flink.api.java.tuple.Tuple0} till {@link org.apache.flink.api.java.tuple.Tuple25}) + * with fields of the given types. A tuple must not be null. + * + * <p>A tuple is a fixed-length composite type for storing multiple values in a + * deterministic field order. Fields of a tuple are typed. Tuples are the most efficient composite + * type; a tuple does not support null-valued fields unless the type of the field supports nullability. + * + * @param types The types of the tuple fields, e.g., Types.STRING, Types.INT + */ + public static <T extends Tuple> TypeInformation<T> TUPLE(TypeInformation<?>... types) { + return new TupleTypeInfo<>(types); + } + + /** + * Returns type information for typed subclasses of Flink's {@link org.apache.flink.api.java.tuple.Tuple}. + * Typed subclassed are classes that extend {@link org.apache.flink.api.java.tuple.Tuple0} till + * {@link org.apache.flink.api.java.tuple.Tuple25} to provide types for all fields and might add + * additional getters and setters for better readability. Additional member fields must not be added. + * A tuple must not be null. + * + * <p>A tuple is a fixed-length composite type for storing multiple values in a + * deterministic field order. Fields of a tuple are typed. Tuples are the most efficient composite + * type; a tuple does not support null-valued fields unless the type of the field supports nullability. + * + * <p>The generic types for all fields of the tuple can be defined in a hierarchy of subclasses. + * + * <p>If Flink's type analyzer is unable to extract a tuple type information with + * type information for all fields, an {@link org.apache.flink.api.common.functions.InvalidTypesException} + * is thrown. + * + * <p>Example use: + * <pre> + * {@code + * class MyTuple extends Tuple2<Integer, String> { + * + * public int getId() { return f0; } + * + * public String getName() { return f1; } + * } + * } + * + * Types.TUPLE(MyTuple.class) + * </pre> + * + * @param tupleSubclass A subclass of {@link org.apache.flink.api.java.tuple.Tuple0} till + * {@link org.apache.flink.api.java.tuple.Tuple25} that defines all field types and + * does not add any additional fields + */ + public static <T extends Tuple> TypeInformation<T> TUPLE(Class<T> tupleSubclass) { + final TypeInformation<T> ti = TypeExtractor.createTypeInfo(tupleSubclass); + if (ti instanceof TupleTypeInfo) { + return ti; + } + throw new InvalidTypesException("Tuple type expected but was: " + ti); + } + + /** + * Returns type information for a POJO (Plain Old Java Object). + * + * <p>A POJO class is public and standalone (no non-static inner class). It has a public no-argument + * constructor. All non-static, non-transient fields in the class (and all superclasses) are either public + * (and non-final) or have a public getter and a setter method that follows the Java beans naming + * conventions for getters and setters. + * + * <p>A POJO is a fixed-length and null-aware composite type. Every field can be null independent + * of the field's type. + * + * <p>The generic types for all fields of the POJO can be defined in a hierarchy of subclasses. + * + * <p>If Flink's type analyzer is unable to extract a valid POJO type information with + * type information for all fields, an {@link org.apache.flink.api.common.functions.InvalidTypesException} + * is thrown. Alternatively, you can use {@link Types#POJO(Class, Map)} to specify all fields manually. + * + * @param pojoClass POJO class to be analyzed by Flink + */ + public static <T> TypeInformation<T> POJO(Class<T> pojoClass) { + final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoClass); + if (ti instanceof PojoTypeInfo) { + return ti; + } + throw new InvalidTypesException("POJO type expected but was: " + ti); + } + + /** + * Returns type information for a POJO (Plain Old Java Object) and allows to specify all fields manually. + * + * <p>A POJO class is public and standalone (no non-static inner class). It has a public no-argument + * constructor. All non-static, non-transient fields in the class (and all superclasses) are either public + * (and non-final) or have a public getter and a setter method that follows the Java beans naming + * conventions for getters and setters. + * + * <p>A POJO is a fixed-length, null-aware composite type with non-deterministic field order. Every field + * can be null independent of the field's type. + * + * <p>The generic types for all fields of the POJO can be defined in a hierarchy of subclasses. + * + * <p>If Flink's type analyzer is unable to extract a POJO field, an + * {@link org.apache.flink.api.common.functions.InvalidTypesException} is thrown. + * + * <p><strong>Note:</strong> In most cases the type information of fields can be determined automatically, + * we recommend to use {@link Types#POJO(Class)}. + * + * @param pojoClass POJO class + * @param fields map of fields that map a name to type information. The map key is the name of + * the field and the value is its type. + */ + public static <T> TypeInformation<T> POJO(Class<T> pojoClass, Map<String, TypeInformation<?>> fields) { + final List<PojoField> pojoFields = new ArrayList<>(fields.size()); + for (Map.Entry<String, TypeInformation<?>> field : fields.entrySet()) { + final Field f = TypeExtractor.getDeclaredField(pojoClass, field.getKey()); + if (f == null) { + throw new InvalidTypesException("Field '" + field.getKey() + "'could not be accessed."); + } + pojoFields.add(new PojoField(f, field.getValue())); + } + + return new PojoTypeInfo<>(pojoClass, pojoFields); + } + + /** + * Returns generic type information for any Java object. The serialization logic will + * use the general purpose serializer Kryo. + * + * <p>Generic types are black-boxes for Flink, but allow any object and null values in fields. + * + * <p>By default, serialization of this type is not very efficient. Please read the documentation + * about how to improve efficiency (namely by pre-registering classes). + * + * @param genericClass any Java class + */ + public static <T> TypeInformation<T> GENERIC(Class<T> genericClass) { + return new GenericTypeInfo<>(genericClass); + } + + /** + * Returns type information for Java arrays of primitive type (such as <code>byte[]</code>). The array + * must not be null. + * + * @param elementType element type of the array (e.g. Types.BOOLEAN, Types.INT, Types.DOUBLE) + */ + public static TypeInformation<?> PRIMITIVE_ARRAY(TypeInformation<?> elementType) { + if (elementType == BOOLEAN) { + return PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType == BYTE) { + return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType == SHORT) { + return PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType == INT) { + return PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType == LONG) { + return PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType == FLOAT) { + return PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType == DOUBLE) { + return PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType == CHAR) { + return PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO; + } + throw new IllegalArgumentException("Invalid element type for a primitive array."); + } + + /** + * Returns type information for Java arrays of object types (such as <code>String[]</code>, + * <code>Integer[]</code>). The array itself must not be null. Null values for elements are supported. + * + * @param elementType element type of the array + */ + @SuppressWarnings("unchecked") + public static <E> TypeInformation<E[]> OBJECT_ARRAY(TypeInformation<E> elementType) { + if (elementType == Types.STRING) { + return (TypeInformation) BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO; + } + return ObjectArrayTypeInfo.getInfoFor(elementType); + } + + /** + * Returns type information for Flink value types (classes that implement + * {@link org.apache.flink.types.Value}). Built-in value types do not support null values (except + * for {@link org.apache.flink.types.StringValue}). + * + * <p>Value types describe their serialization and deserialization manually. Instead of going + * through a general purpose serialization framework. A value type is reasonable when general purpose + * serialization would be highly inefficient. The wrapped value can be altered, allowing programmers to + * reuse objects and take pressure off the garbage collector. + * + * <p>Flink provides built-in value types for all Java primitive types (such as + * {@link org.apache.flink.types.BooleanValue}, {@link org.apache.flink.types.IntValue}) as well + * as {@link org.apache.flink.types.StringValue}, {@link org.apache.flink.types.NullValue}, + * {@link org.apache.flink.types.ListValue}, and {@link org.apache.flink.types.MapValue}. + * + * @param valueType class that implements {@link org.apache.flink.types.Value} + */ + public static <V extends Value> TypeInformation<V> VALUE(Class<V> valueType) { + return new ValueTypeInfo<>(valueType); + } + + /** + * Returns type information for a Java {@link java.util.Map}. A map must not be null. Null values + * in keys are not supported. An entry's value can be null. + * + * <p>By default, maps are untyped and treated as a generic type in Flink; therefore, it is useful + * to pass type information whenever a map is used. + * + * <p><strong>Note:</strong> Flink does not preserve the concrete {@link Map} type. It converts a map into {@link HashMap} when + * copying or deserializing. + * + * @param keyType type information for the map's keys + * @param valueType type information for the map's values + */ + public static <K, V> TypeInformation<Map<K, V>> MAP(TypeInformation<K> keyType, TypeInformation<V> valueType) { + return new MapTypeInfo<>(keyType, valueType); + } + + /** + * Returns type information for a Java {@link java.util.List}. A list must not be null. Null values + * in elements are not supported. + * + * <p>By default, lists are untyped and treated as a generic type in Flink; therefore, it is useful + * to pass type information whenever a list is used. + * + * <p><strong>Note:</strong> Flink does not preserve the concrete {@link List} type. It converts a list into {@link ArrayList} when + * copying or deserializing. + * + * @param elementType type information for the list's elements + */ + public static <E> TypeInformation<List<E>> LIST(TypeInformation<E> elementType) { + return new ListTypeInfo<>(elementType); + } + + /** + * Returns type information for Java enumerations. Null values are not supported. + * + * @param enumType enumeration class extending {@link java.lang.Enum} + */ + public static <E extends Enum<E>> TypeInformation<E> ENUM(Class<E> enumType) { + return new EnumTypeInfo<>(enumType); + } + + /** + * Returns type information for Flink's {@link org.apache.flink.types.Either} type. Null values + * are not supported. + * + * <p>Either type can be used for a value of two possible types. + * + * <p>Example use: <code>Types.EITHER(Types.VOID, Types.INT)</code> + * + * @param leftType type information of left side / {@link org.apache.flink.types.Either.Left} + * @param rightType type information of right side / {@link org.apache.flink.types.Either.Right} + */ + public static <L, R> TypeInformation<Either<L, R>> EITHER(TypeInformation<L> leftType, TypeInformation<R> rightType) { + return new EitherTypeInfo<>(leftType, rightType); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java index 33820e5..12a9ae0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java @@ -16,7 +16,7 @@ * limitations under the License. */ - + package org.apache.flink.api.java.typeutils; import java.lang.reflect.Field; @@ -31,6 +31,10 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.types.Value; +/** + * @deprecated Use {@link org.apache.flink.api.common.typeinfo.Types} instead. + */ +@Deprecated @Public public class TypeInfoParser { private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple"; http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala index 100c22b..4be137d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala @@ -17,6 +17,8 @@ */ package org.apache.flink.table.api +import _root_.java.{lang, math, sql, util} + import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes} import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo @@ -25,35 +27,95 @@ import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { - val STRING = JTypes.STRING - val BOOLEAN = JTypes.BOOLEAN + /** + * Returns type information for a Table API string or SQL VARCHAR type. + */ + val STRING: TypeInformation[String] = JTypes.STRING + + /** + * Returns type information for a Table API boolean or SQL BOOLEAN type. + */ + val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN + + /** + * Returns type information for a Table API byte or SQL TINYINT type. + */ + val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE + + /** + * Returns type information for a Table API short or SQL SMALLINT type. + */ + val SHORT: TypeInformation[lang.Short] = JTypes.SHORT + + /** + * Returns type information for a Table API integer or SQL INT/INTEGER type. + */ + val INT: TypeInformation[lang.Integer] = JTypes.INT - val BYTE = JTypes.BYTE - val SHORT = JTypes.SHORT - val INT = JTypes.INT - val LONG = JTypes.LONG - val FLOAT = JTypes.FLOAT - val DOUBLE = JTypes.DOUBLE - val DECIMAL = JTypes.DECIMAL + /** + * Returns type information for a Table API long or SQL BIGINT type. + */ + val LONG: TypeInformation[lang.Long] = JTypes.LONG - val SQL_DATE = JTypes.SQL_DATE - val SQL_TIME = JTypes.SQL_TIME - val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP - val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS - val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** + * Returns type information for a Table API float or SQL FLOAT/REAL type. + */ + val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT + + /** + * Returns type information for a Table API integer or SQL DOUBLE type. + */ + val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE /** - * Generates row type information. + * Returns type information for a Table API big decimal or SQL DECIMAL type. + */ + val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC + + /** + * Returns type information for a Table API SQL date or SQL DATE type. + */ + val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE + + /** + * Returns type information for a Table API SQL time or SQL TIME type. + */ + val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME + + /** + * Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type. + */ + val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP + + /** + * Returns type information for a Table API interval of months. + */ + val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS + + /** + * Returns type information for a Table API interval milliseconds. + */ + val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS + + /** + * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types. + * + * A row is a variable-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null regardless of the field's type. + * The type of row fields cannot be automatically inferred; therefore, it is required to provide + * type information whenever a row is used. * - * A row type consists of zero or more fields with a field name and a corresponding type. + * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all + * row instances must strictly adhere to the schema defined by the type info. * - * The fields have the default names (f0, f1, f2 ..). + * This method generates type information with fields of the given types; the fields have + * the default names (f0, f1, f2 ..). * - * @param types types of row fields; e.g. Types.STRING, Types.INT + * @param types The types of the row fields, e.g., Types.STRING, Types.INT */ @varargs def ROW(types: TypeInformation[_]*): TypeInformation[Row] = { @@ -61,19 +123,29 @@ object Types { } /** - * Generates row type information. + * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types + * and with given names. + * + * A row is a variable-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null independent of the field's type. + * The type of row fields cannot be automatically inferred; therefore, it is required to provide + * type information whenever a row is used. + * + * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all + * row instances must strictly adhere to the schema defined by the type info. * - * A row type consists of zero or more fields with a field name and a corresponding type. + * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`. * - * @param names names of row fields, e.g. "userid", "name" - * @param types types of row fields; e.g. Types.STRING, Types.INT + * @param fieldNames array of field names + * @param types array of field types */ - def ROW(names: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = { - JTypes.ROW_NAMED(names, types: _*) + def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = { + JTypes.ROW_NAMED(fieldNames, types: _*) } /** - * Generates type information for an array consisting of Java primitive elements. + * Generates type information for an array consisting of Java primitive elements. The elements + * do not support null values. * * @param elementType type of the array elements; e.g. Types.INT */ @@ -93,30 +165,35 @@ object Types { } /** - * Generates type information for an array consisting of Java object elements. + * Generates type information for an array consisting of Java object elements. Null values for + * elements are supported. * * @param elementType type of the array elements; e.g. Types.STRING or Types.INT */ - def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = { + def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]] = { ObjectArrayTypeInfo.getInfoFor(elementType) } /** - * Generates type information for a Java HashMap. + * Generates type information for a Java HashMap. Null values in keys are not supported. An + * entry's value can be null. * * @param keyType type of the keys of the map e.g. Types.STRING * @param valueType type of the values of the map e.g. Types.STRING */ - def MAP(keyType: TypeInformation[_], valueType: TypeInformation[_]): TypeInformation[_] = { + def MAP[K, V]( + keyType: TypeInformation[K], + valueType: TypeInformation[V]): TypeInformation[util.Map[K, V]] = { new MapTypeInfo(keyType, valueType) } /** - * Generates type information for a Multiset. + * Generates type information for a Multiset. A Multiset is baked by a Java HashMap and maps an + * arbitrary key to an integer value. Null values in keys are not supported. * * @param elementType type of the elements of the multiset e.g. Types.STRING */ - def MULTISET(elementType: TypeInformation[_]): TypeInformation[_] = { + def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[util.Map[E, lang.Integer]] = { new MultisetTypeInfo(elementType) } } http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala index 0d12400..110ea6a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala @@ -20,9 +20,8 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ -import org.apache.flink.table.api.scala._ import org.apache.flink.table.api._ -import org.apache.flink.table.runtime.utils._ +import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils._ import org.apache.flink.types.Row import org.junit.Test http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala new file mode 100644 index 0000000..4ce9b0f --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes} +import org.apache.flink.types.Row + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.util.{Either, Try} + +/** + * This class gives access to the type information of the most common Scala types for which Flink + * has built-in serializers and comparators. + * + * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds + * types for Scala specific classes (such as [[Unit]] or case classes). + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or cases where automatic type inference results in an inefficient type. + * + * Scala macros allow to determine type information of classes and type parameters. You can + * use [[Types.of]] to let type information be determined automatically. + */ +@PublicEvolving +object Types { + + /** + * Generates type information based on the given class and/or its type parameters. + * + * The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does + * not require to implement anonymous classes. + * + * If the class could not be analyzed by the Scala type analyzer, the Java analyzer + * will be used. + * + * Example use: + * + * `Types.of[(Int, String, String)]` for Scala tuples + * `Types.of[Unit]` for Scala specific types + * + * @tparam T class to be analyzed + */ + def of[T: TypeInformation]: TypeInformation[T] = { + val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] + typeInfo + } + + /** + * Returns type information for Scala [[Nothing]]. Does not support a null value. + */ + val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo + + /** + * Returns type information for Scala [[Unit]]. Does not support a null value. + */ + val UNIT: TypeInformation[Unit] = new UnitTypeInfo + + /** + * Returns type information for [[String]] and [[java.lang.String]]. Supports a null value. + */ + val STRING: TypeInformation[String] = JTypes.STRING + + /** + * Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not + * support a null value. + */ + val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE + + /** + * Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not + * support a null value. + */ + val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN + + /** + * Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not + * support a null value. + */ + val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT + + /** + * Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not + * support a null value. + */ + val INT: TypeInformation[java.lang.Integer] = JTypes.INT + + /** + * Returns type information for primitive [[Long]] and [[java.lang.Long]]. Does not + * support a null value. + */ + val LONG: TypeInformation[java.lang.Long] = JTypes.LONG + + /** + * Returns type information for primitive [[Float]] and [[java.lang.Float]]. Does not + * support a null value. + */ + val FLOAT: TypeInformation[java.lang.Float] = JTypes.FLOAT + + /** + * Returns type information for primitive [[Double]] and [[java.lang.Double]]. Does not + * support a null value. + */ + val DOUBLE: TypeInformation[java.lang.Double] = JTypes.DOUBLE + + /** + * Returns type information for primitive [[Char]] and [[java.lang.Character]]. Does not + * support a null value. + */ + val CHAR: TypeInformation[java.lang.Character] = JTypes.CHAR + + /** + * Returns type information for Java [[java.math.BigDecimal]]. Supports a null value. + * + * Note that Scala [[BigDecimal]] is not supported yet. + */ + val JAVA_BIG_DEC: TypeInformation[java.math.BigDecimal] = JTypes.BIG_DEC + + /** + * Returns type information for Java [[java.math.BigInteger]]. Supports a null value. + * + * Note that Scala [[BigInt]] is not supported yet. + */ + val JAVA_BIG_INT: TypeInformation[java.math.BigInteger] = JTypes.BIG_INT + + /** + * Returns type information for [[java.sql.Date]]. Supports a null value. + */ + val SQL_DATE: TypeInformation[java.sql.Date] = JTypes.SQL_DATE + + /** + * Returns type information for [[java.sql.Time]]. Supports a null value. + */ + val SQL_TIME: TypeInformation[java.sql.Time] = JTypes.SQL_TIME + + /** + * Returns type information for [[java.sql.Timestamp]]. Supports a null value. + */ + val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp] = JTypes.SQL_TIMESTAMP + + /** + * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types. + * A row itself must not be null. + * + * A row is a fixed-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null regardless of the field's type. + * The type of row fields cannot be automatically inferred; therefore, it is required to provide + * type information whenever a row is used. + * + * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row + * instances must strictly adhere to the schema defined by the type info. + * + * This method generates type information with fields of the given types; the fields have + * the default names (f0, f1, f2 ..). + * + * @param types The types of the row fields, e.g., Types.STRING, Types.INT + */ + def ROW(types: TypeInformation[_]*): TypeInformation[Row] = JTypes.ROW(types: _*) + + /** + * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types + * and with given names. A row must not be null. + * + * A row is a variable-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null independent of the field's type. + * The type of row fields cannot be automatically inferred; therefore, it is required to provide + * type information whenever a row is used. + * + * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row + * instances must strictly adhere to the schema defined by the type info. + * + * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`. + * + * @param fieldNames array of field names + * @param types array of field types + */ + def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = + JTypes.ROW_NAMED(fieldNames, types: _*) + + /** + * Returns type information for a POJO (Plain Old Java Object). + * + * A POJO class is public and standalone (no non-static inner class). It has a public + * no-argument constructor. All non-static, non-transient fields in the class (and all + * superclasses) are either public (and non-final) or have a public getter and a setter + * method that follows the Java beans naming conventions for getters and setters. + * + * A POJO is a fixed-length, null-aware composite type with non-deterministic field order. + * Every field can be null independent of the field's type. + * + * The generic types for all fields of the POJO can be defined in a hierarchy of subclasses. + * + * If Flink's type analyzer is unable to extract a valid POJO type information with + * type information for all fields, an + * [[org.apache.flink.api.common.functions.InvalidTypesException}]] is thrown. + * Alternatively, you can use [[Types.POJO(Class, Map)]] to specify all fields manually. + * + * @param pojoClass POJO class to be analyzed by Flink + */ + def POJO[T](pojoClass: Class[T]): TypeInformation[T] = { + JTypes.POJO(pojoClass) + } + + /** + * Returns type information for a POJO (Plain Old Java Object) and allows to specify all + * fields manually. + * + * A POJO class is public and standalone (no non-static inner class). It has a public no-argument + * constructor. All non-static, non-transient fields in the class (and all superclasses) are + * either public (and non-final) or have a public getter and a setter method that follows the + * Java beans naming conventions for getters and setters. + * + * A POJO is a fixed-length, null-aware composite type with non-deterministic field order. + * Every field can be null independent of the field's type. + * + * The generic types for all fields of the POJO can be defined in a hierarchy of subclasses. + * + * If Flink's type analyzer is unable to extract a POJO field, an + * [[org.apache.flink.api.common.functions.InvalidTypesException]] is thrown. + * + * '''Note:''' In most cases the type information of fields can be determined automatically, + * we recommend to use [[Types.POJO(Class)]]. + * + * @param pojoClass POJO class + * @param fields map of fields that map a name to type information. The map key is the name of + * the field and the value is its type. + */ + def POJO[T](pojoClass: Class[T], fields: Map[String, TypeInformation[_]]): TypeInformation[T] = { + JTypes.POJO(pojoClass, fields.asJava) + } + + /** + * Returns generic type information for any Scala/Java object. The serialization logic will + * use the general purpose serializer Kryo. + * + * Generic types are black-boxes for Flink, but allow any object and null values in fields. + * + * By default, serialization of this type is not very efficient. Please read the documentation + * about how to improve efficiency (namely by pre-registering classes). + * + * @param genericClass any Scala/Java class + */ + def GENERIC[T](genericClass: Class[T]): TypeInformation[T] = JTypes.GENERIC(genericClass) + + /** + * Returns type information for a Scala case class and Scala tuples. + * + * A Scala case class is a fixed-length composite type for storing multiple values in a + * deterministic field order. Fields of a case class are typed. Case classes and tuples are + * the most efficient composite type; therefore, they do not not support null-valued fields + * unless the type of the field supports nullability. + * + * Example use: `Types.CASE_CLASS[MyCaseClass]` + * + * @tparam T case class to be analyzed + */ + def CASE_CLASS[T: TypeInformation]: TypeInformation[T] = { + val t = Types.of[T] + if (t.isInstanceOf[CaseClassTypeInfo[_]]) { + t + } else { + throw new InvalidTypesException("Case class type expected but was: " + t) + } + } + + /** + * Returns type information for a Scala tuple. + * + * A Scala tuple is a fixed-length composite type for storing multiple values in a + * deterministic field order. Fields of a tuple are typed. Tuples are + * the most efficient composite type; therefore, they do not not support null-valued fields + * unless the type of the field supports nullability. + * + * Example use: `Types.TUPLE[(String, Int)]` + * + * @tparam T tuple to be analyzed + */ + def TUPLE[T: TypeInformation]: TypeInformation[T] = { + CASE_CLASS[T] + } + + /** + * Returns type information for Scala/Java arrays of primitive type (such as `Array[Byte]`). + * The array and its elements do not support null values. + * + * @param elementType element type of the array (e.g. Types.BOOLEAN, Types.INT, Types.DOUBLE) + */ + def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = { + JTypes.PRIMITIVE_ARRAY(elementType) + } + + /** + * Returns type information for Scala/Java arrays of object types (such as `Array[String]`, + * `Array[java.lang.Integer]`). The array itself must not be null. Null values for elements + * are supported. + * + * @param elementType element type of the array + */ + def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]] = { + // necessary for the Scala compiler + JTypes.OBJECT_ARRAY(elementType).asInstanceOf[TypeInformation[Array[E]]] + } + + /** + * Returns type information for Scala [[Either]] type. Null values are not supported. + * + * The either type can be used for a value of two possible types. + * + * Example use: `Types.EITHER(Types.INT, Types.NOTHING]` + * + * @param leftType type information of left side / [[Left]] + * @param rightType type information of right side / [[Right]] + */ + def EITHER[A, B]( + leftType: TypeInformation[A], + rightType: TypeInformation[B]): TypeInformation[Either[A, B]] = { + new EitherTypeInfo(classOf[Either[A, B]], leftType, rightType) + } + + /** + * Returns type information for Scala [[Option]] type. Null values are not supported. + * + * The option type can be used for distinguishing between a value or no value. + * + * Example use: `Types.OPTION(Types.INT)` + * + * @param valueType type information of the option's value + */ + def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T] = { + new OptionTypeInfo(valueType) + } + + /** + * Returns type information for Scala [[Try]] type. Null values are not supported. + * + * The try type can be used for distinguishing between a value or throwable. + * + * Example use: `Types.TRY(Types.INT)` + * + * @param valueType type information of the try's value + */ + def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T] = { + new TryTypeInfo(valueType) + } + + /** + * Returns type information for Scala enumerations. Null values are not supported. + * + * @param enum enumeration object + * @param valueClass value class + */ + def ENUMERATION[E <: Enumeration]( + enum: E, + valueClass: Class[E#Value]): TypeInformation[E#Value] = { + new EnumValueTypeInfo(enum, valueClass) + } + + /** + * Returns type information for Scala collections that implement [[Traversable]]. Null values + * are not supported. + */ + def TRAVERSABLE[T: TypeInformation]: TypeInformation[T] = { + val t = Types.of[T] + if (t.isInstanceOf[TraversableTypeInfo[_, _]]) { + t + } else { + throw new InvalidTypesException("Traversable type expected but was: " + t) + } + } +}