This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 185ed0e2350b132bbec20b54e6cae6fe72710eae Author: Timo Walther <twal...@apache.org> AuthorDate: Tue May 21 15:45:47 2019 +0200 [FLINK-12254][table-common] Add a converter between old type information behavior and data type --- .../types/logical/LegacyTypeInformationType.java | 120 +++++++ .../utils/LegacyTypeInfoDataTypeConverter.java | 352 +++++++++++++++++++++ .../flink/table/types/utils/TypeConversions.java | 85 +++++ .../types/LegacyTypeInfoDataTypeConverterTest.java | 147 +++++++++ 4 files changed, 704 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java new file mode 100644 index 0000000..a6d6a20 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * This type is a temporary solution to fully support the old type system stack through the new + * stack. Many types can be mapped directly to the new type system, however, some types such as + * {@code DECIMAL}, POJOs, or case classes need special handling. + * + * <p>This type differs from {@link TypeInformationAnyType}. This type is allowed to travel through + * the stack whereas {@link TypeInformationAnyType} should be resolved eagerly to {@link AnyType} by + * the planner. + * + * <p>This class can be removed once we have removed all deprecated methods that take or return + * {@link TypeInformation}. + * + * @see LegacyTypeInfoDataTypeConverter + */ +@Internal +public final class LegacyTypeInformationType<T> extends LogicalType { + + private static final String FORMAT = "LEGACY(%s)"; + + private final TypeInformation<T> typeInfo; + + public LegacyTypeInformationType(LogicalTypeRoot logicalTypeRoot, TypeInformation<T> typeInfo) { + super(true, logicalTypeRoot); + this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + } + + public TypeInformation<T> getTypeInformation() { + return typeInfo; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new LegacyTypeInformationType<>(getTypeRoot(), typeInfo); + } + + @Override + public String asSerializableString() { + throw new TableException("Legacy type information has no serializable string representation."); + } + + @Override + public String asSummaryString() { + return withNullability(FORMAT, typeInfo); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return typeInfo.getTypeClass().isAssignableFrom(clazz); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + return clazz.isAssignableFrom(typeInfo.getTypeClass()); + } + + @Override + public Class<?> getDefaultConversion() { + return typeInfo.getTypeClass(); + } + + @Override + public List<LogicalType> getChildren() { + return Collections.emptyList(); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LegacyTypeInformationType<?> that = (LegacyTypeInformationType<?>) o; + return typeInfo.equals(that.typeInfo); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), typeInfo); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java new file mode 100644 index 0000000..5c31be1 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.MultisetTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TypeInformationAnyType; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.table.typeutils.TimeIntervalTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute; + +/** + * Converter between {@link TypeInformation} and {@link DataType} that reflects the behavior before + * Flink 1.9. The conversion is a 1:1 mapping that allows back-and-forth conversion. + * + * <p>This converter only exists to still support deprecated methods that take or return {@link TypeInformation}. + * Some methods will still support type information in the future, however, the future type information + * support will integrate nicer with the new type stack. This converter reflects the old behavior that includes: + * + * <p>Use old {@code java.sql.*} time classes for time data types. + * + * <p>Only support millisecond precision for timestamps or day-time intervals. + * + * <p>Do not support fractional seconds for the time type. + * + * <p>Let variable precision and scale for decimal types pass through the planner. + * + * <p>Let POJOs, case classes, and tuples pass through the planner. + * + * <p>Inconsistent nullability. Most types are nullable even though type information does not support it. + * + * <p>Distinction between {@link BasicArrayTypeInfo} and {@link ObjectArrayTypeInfo}. + */ +@Internal +public final class LegacyTypeInfoDataTypeConverter { + + private static final Map<TypeInformation<?>, DataType> typeInfoDataTypeMap = new HashMap<>(); + private static final Map<DataType, TypeInformation<?>> dataTypeTypeInfoMap = new HashMap<>(); + static { + addMapping(Types.STRING, DataTypes.STRING().bridgedTo(String.class)); + addMapping(Types.BOOLEAN, DataTypes.BOOLEAN().bridgedTo(Boolean.class)); + addMapping(Types.BYTE, DataTypes.TINYINT().bridgedTo(Byte.class)); + addMapping(Types.SHORT, DataTypes.SMALLINT().bridgedTo(Short.class)); + addMapping(Types.INT, DataTypes.INT().bridgedTo(Integer.class)); + addMapping(Types.LONG, DataTypes.BIGINT().bridgedTo(Long.class)); + addMapping(Types.FLOAT, DataTypes.FLOAT().bridgedTo(Float.class)); + addMapping(Types.DOUBLE, DataTypes.DOUBLE().bridgedTo(Double.class)); + addMapping(Types.BIG_DEC, createLegacyType(LogicalTypeRoot.DECIMAL, Types.BIG_DEC)); + addMapping(Types.SQL_DATE, DataTypes.DATE().bridgedTo(java.sql.Date.class)); + addMapping(Types.SQL_TIME, DataTypes.TIME(0).bridgedTo(java.sql.Time.class)); + addMapping(Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)); + addMapping( + TimeIntervalTypeInfo.INTERVAL_MONTHS, + DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(Integer.class)); + addMapping( + TimeIntervalTypeInfo.INTERVAL_MILLIS, + DataTypes.INTERVAL(DataTypes.SECOND(3)).bridgedTo(Long.class)); + addMapping( + PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, + DataTypes.ARRAY(DataTypes.BOOLEAN().notNull().bridgedTo(boolean.class)).bridgedTo(boolean[].class)); + addMapping( + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, + DataTypes.BYTES().bridgedTo(byte[].class)); + addMapping( + PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, + DataTypes.ARRAY(DataTypes.SMALLINT().notNull().bridgedTo(short.class)).bridgedTo(short[].class)); + addMapping( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, + DataTypes.ARRAY(DataTypes.INT().notNull().bridgedTo(int.class)).bridgedTo(int[].class)); + addMapping( + PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, + DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class)).bridgedTo(long[].class)); + addMapping( + PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, + DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class)).bridgedTo(float[].class)); + addMapping( + PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, + DataTypes.ARRAY(DataTypes.DOUBLE().notNull().bridgedTo(double.class)).bridgedTo(double[].class)); + } + + private static void addMapping(TypeInformation<?> typeInfo, DataType dataType) { + Preconditions.checkArgument(!typeInfoDataTypeMap.containsKey(typeInfo)); + typeInfoDataTypeMap.put(typeInfo, dataType); + dataTypeTypeInfoMap.put(dataType, typeInfo); + } + + public static DataType toDataType(TypeInformation<?> typeInfo) { + // time indicators first as their hashCode/equals is shared with those of regular timestamps + if (typeInfo instanceof TimeIndicatorTypeInfo) { + return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo); + } + + final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo); + if (foundDataType != null) { + return foundDataType; + } + + if (typeInfo instanceof RowTypeInfo) { + return convertToRowType((RowTypeInfo) typeInfo); + } + + else if (typeInfo instanceof ObjectArrayTypeInfo) { + return convertToArrayType( + typeInfo.getTypeClass(), + ((ObjectArrayTypeInfo) typeInfo).getComponentInfo()); + } + + else if (typeInfo instanceof BasicArrayTypeInfo) { + return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo); + } + + else if (typeInfo instanceof MultisetTypeInfo) { + return convertToMultisetType(((MultisetTypeInfo) typeInfo).getElementTypeInfo()); + } + + else if (typeInfo instanceof MapTypeInfo) { + return convertToMapType((MapTypeInfo) typeInfo); + } + + else if (typeInfo instanceof CompositeType) { + return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo); + } + + return createLegacyType(LogicalTypeRoot.ANY, typeInfo); + } + + public static TypeInformation<?> toLegacyTypeInfo(DataType dataType) { + // time indicators first as their hashCode/equals is shared with those of regular timestamps + if (canConvertToTimeAttributeTypeInfo(dataType)) { + return convertToTimeAttributeTypeInfo((TimestampType) dataType.getLogicalType()); + } + + final TypeInformation<?> foundTypeInfo = dataTypeTypeInfoMap.get(dataType); + if (foundTypeInfo != null) { + return foundTypeInfo; + } + + if (canConvertToLegacyTypeInfo(dataType)) { + return convertToLegacyTypeInfo(dataType); + } + + else if (canConvertToRowTypeInfo(dataType)) { + return convertToRowTypeInfo((FieldsDataType) dataType); + } + + // this could also match for basic array type info but this is covered by legacy type info + else if (canConvertToObjectArrayTypeInfo(dataType)) { + return convertToObjectArrayTypeInfo((CollectionDataType) dataType); + } + + else if (canConvertToMultisetTypeInfo(dataType)) { + return convertToMultisetTypeInfo((CollectionDataType) dataType); + } + + else if (canConvertToMapTypeInfo(dataType)) { + return convertToMapTypeInfo((KeyValueDataType) dataType); + } + + // makes the any type accessible in the legacy planner + else if (canConvertToAnyTypeInfo(dataType)) { + return convertToAnyTypeInfo(dataType); + } + + throw new TableException( + String.format( + "Unsupported conversion from data type '%s' to type information. Only data types " + + "that originated from type information fully support a reverse conversion.", + dataType)); + } + + private static DataType createLegacyType(LogicalTypeRoot typeRoot, TypeInformation<?> typeInfo) { + return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot, typeInfo)) + .bridgedTo(typeInfo.getTypeClass()); + } + + private static DataType convertToTimeAttributeType(TimeIndicatorTypeInfo timeIndicatorTypeInfo) { + final TimestampKind kind; + if (timeIndicatorTypeInfo.isEventTime()) { + kind = TimestampKind.ROWTIME; + } else { + kind = TimestampKind.PROCTIME; + } + return new AtomicDataType(new TimestampType(true, kind, 3)) + .bridgedTo(java.sql.Timestamp.class); + } + + private static boolean canConvertToTimeAttributeTypeInfo(DataType dataType) { + return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) && + dataTypeTypeInfoMap.containsKey(dataType) && // checks precision and conversion + ((TimestampType) dataType.getLogicalType()).getKind() != TimestampKind.REGULAR; + } + + private static TypeInformation<?> convertToTimeAttributeTypeInfo(TimestampType timestampType) { + if (isRowtimeAttribute(timestampType)) { + return TimeIndicatorTypeInfo.ROWTIME_INDICATOR; + } else { + return TimeIndicatorTypeInfo.PROCTIME_INDICATOR; + } + } + + private static DataType convertToRowType(RowTypeInfo rowTypeInfo) { + final String[] fieldNames = rowTypeInfo.getFieldNames(); + final DataTypes.Field[] fields = IntStream.range(0, rowTypeInfo.getArity()) + .mapToObj(i -> { + DataType fieldType = toDataType(rowTypeInfo.getTypeAt(i)); + + return DataTypes.FIELD( + fieldNames[i], + fieldType); + }) + .toArray(DataTypes.Field[]::new); + + return DataTypes.ROW(fields).bridgedTo(Row.class); + } + + private static boolean canConvertToRowTypeInfo(DataType dataType) { + return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ROW) && + dataType.getConversionClass().equals(Row.class) && + ((RowType) dataType.getLogicalType()).getFields().stream() + .noneMatch(f -> f.getDescription().isPresent()); + } + + private static TypeInformation<?> convertToRowTypeInfo(FieldsDataType fieldsDataType) { + final RowType rowType = (RowType) fieldsDataType.getLogicalType(); + + final String[] fieldNames = rowType.getFields() + .stream() + .map(RowType.RowField::getName) + .toArray(String[]::new); + + final TypeInformation<?>[] fieldTypes = Stream.of(fieldNames) + .map(name -> fieldsDataType.getFieldDataTypes().get(name)) + .map(LegacyTypeInfoDataTypeConverter::toLegacyTypeInfo) + .toArray(TypeInformation[]::new); + + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + private static DataType convertToArrayType(Class<?> arrayClass, TypeInformation<?> elementTypeInfo) { + return DataTypes.ARRAY(toDataType(elementTypeInfo)).bridgedTo(arrayClass); + } + + private static boolean canConvertToObjectArrayTypeInfo(DataType dataType) { + return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ARRAY) && + dataType.getConversionClass().isArray(); + } + + private static TypeInformation<?> convertToObjectArrayTypeInfo(CollectionDataType collectionDataType) { + // Types.OBJECT_ARRAY would return a basic type info for strings + return ObjectArrayTypeInfo.getInfoFor( + toLegacyTypeInfo(collectionDataType.getElementDataType())); + } + + private static DataType convertToMultisetType(TypeInformation elementTypeInfo) { + return DataTypes.MULTISET(toDataType(elementTypeInfo)).bridgedTo(Map.class); + } + + private static boolean canConvertToMultisetTypeInfo(DataType dataType) { + return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.MULTISET) && + dataType.getConversionClass() == Map.class; + } + + private static TypeInformation<?> convertToMultisetTypeInfo(CollectionDataType collectionDataType) { + return new MultisetTypeInfo<>( + toLegacyTypeInfo(collectionDataType.getElementDataType())); + } + + private static DataType convertToMapType(MapTypeInfo typeInfo) { + return DataTypes.MAP( + toDataType(typeInfo.getKeyTypeInfo()), + toDataType(typeInfo.getValueTypeInfo())) + .bridgedTo(Map.class); + } + + private static boolean canConvertToMapTypeInfo(DataType dataType) { + return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.MAP) && + dataType.getConversionClass() == Map.class; + } + + private static TypeInformation<?> convertToMapTypeInfo(KeyValueDataType dataType) { + return Types.MAP( + toLegacyTypeInfo(dataType.getKeyDataType()), + toLegacyTypeInfo(dataType.getValueDataType())); + } + + private static boolean canConvertToLegacyTypeInfo(DataType dataType) { + return dataType.getLogicalType() instanceof LegacyTypeInformationType; + } + + private static TypeInformation<?> convertToLegacyTypeInfo(DataType dataType) { + return ((LegacyTypeInformationType) dataType.getLogicalType()).getTypeInformation(); + } + + private static boolean canConvertToAnyTypeInfo(DataType dataType) { + return dataType.getLogicalType() instanceof TypeInformationAnyType && + dataType.getConversionClass().equals( + ((TypeInformationAnyType) dataType.getLogicalType()).getTypeInformation().getTypeClass()); + } + + private static TypeInformation<?> convertToAnyTypeInfo(DataType dataType) { + return ((TypeInformationAnyType) dataType.getLogicalType()).getTypeInformation(); + } + + private LegacyTypeInfoDataTypeConverter() { + // no instantiation + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java new file mode 100644 index 0000000..a5c978f --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.Optional; +import java.util.stream.Stream; + +/** + * Conversion hub for interoperability of {@link Class}, {@link TypeInformation}, {@link DataType}, + * and {@link LogicalType}. + * + * <p>See the corresponding converter classes for more information about how the conversion is performed. + */ +@Internal +public final class TypeConversions { + + public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) { + return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo); + } + + public static DataType[] fromLegacyInfoToDataType(TypeInformation<?>[] typeInfo) { + return Stream.of(typeInfo) + .map(TypeConversions::fromLegacyInfoToDataType) + .toArray(DataType[]::new); + } + + public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType dataType) { + return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(dataType); + } + + public static TypeInformation<?>[] fromDataTypeToLegacyInfo(DataType[] dataType) { + return Stream.of(dataType) + .map(TypeConversions::fromDataTypeToLegacyInfo) + .toArray(TypeInformation[]::new); + } + + public static Optional<DataType> fromClassToDataType(Class<?> clazz) { + return ClassDataTypeConverter.extractDataType(clazz); + } + + public static DataType fromLogicalToDataType(LogicalType logicalType) { + return LogicalTypeDataTypeConverter.toDataType(logicalType); + } + + public static DataType[] fromLogicalToDataType(LogicalType[] logicalTypes) { + return Stream.of(logicalTypes) + .map(LogicalTypeDataTypeConverter::toDataType) + .toArray(DataType[]::new); + } + + public static LogicalType fromDataToLogicalType(DataType dataType) { + return dataType.getLogicalType(); + } + + public static LogicalType[] fromDataToLogicalType(DataType[] dataTypes) { + return Stream.of(dataTypes) + .map(TypeConversions::fromDataToLogicalType) + .toArray(LogicalType[]::new); + } + + private TypeConversions() { + // no instance + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java new file mode 100644 index 0000000..fbce34e --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link LegacyTypeInfoDataTypeConverter}. + */ +@RunWith(Parameterized.class) +public class LegacyTypeInfoDataTypeConverterTest { + + @Parameters(name = "[{index}] type info: {0} data type: {1}") + public static List<Object[]> typeInfo() { + return Arrays.asList( + new Object[][]{ + {Types.STRING, DataTypes.STRING()}, + + {Types.BOOLEAN, DataTypes.BOOLEAN()}, + + {Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)}, + + { + Types.GENERIC(LegacyTypeInfoDataTypeConverterTest.class), + new AtomicDataType( + new LegacyTypeInformationType<>( + LogicalTypeRoot.ANY, + Types.GENERIC(LegacyTypeInfoDataTypeConverterTest.class))) + }, + + { + Types.ROW_NAMED(new String[] {"field1", "field2"}, Types.INT, Types.LONG), + DataTypes.ROW( + FIELD("field1", DataTypes.INT()), + FIELD("field2", DataTypes.BIGINT())) + }, + + { + Types.MAP(Types.FLOAT, Types.ROW(Types.BYTE)), + DataTypes.MAP(DataTypes.FLOAT(), DataTypes.ROW(FIELD("f0", DataTypes.TINYINT()))) + }, + + { + Types.PRIMITIVE_ARRAY(Types.FLOAT), + DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class)) + .bridgedTo(float[].class) + }, + + { + Types.PRIMITIVE_ARRAY(Types.BYTE), + DataTypes.BYTES() + }, + + { + Types.OBJECT_ARRAY(Types.PRIMITIVE_ARRAY(Types.FLOAT)), + DataTypes.ARRAY( + DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class)) + .bridgedTo(float[].class)) + .bridgedTo(float[][].class) + }, + + { + BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, + new AtomicDataType( + new LegacyTypeInformationType<>( + LogicalTypeRoot.ARRAY, + BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)) + }, + + { + ObjectArrayTypeInfo.getInfoFor(Types.STRING), + DataTypes.ARRAY(DataTypes.STRING()) + .bridgedTo(String[].class) + }, + + { + Types.TUPLE(Types.SHORT, Types.DOUBLE, Types.FLOAT), + new AtomicDataType( + new LegacyTypeInformationType<>( + LogicalTypeRoot.STRUCTURED_TYPE, + Types.TUPLE(Types.SHORT, Types.DOUBLE, Types.FLOAT))) + }, + + { + TimeIndicatorTypeInfo.ROWTIME_INDICATOR, + new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3)) + .bridgedTo(java.sql.Timestamp.class) + } + } + ); + } + + @Parameter + public TypeInformation<?> inputTypeInfo; + + @Parameter(1) + public DataType dataType; + + @Test + public void testTypeInfoToDataTypeConversion() { + assertThat(LegacyTypeInfoDataTypeConverter.toDataType(inputTypeInfo), equalTo(dataType)); + } + + @Test + public void testDataTypeToTypeInfoConversion() { + assertThat(LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(dataType), equalTo(inputTypeInfo)); + } +}