This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c3a7b6371335d86a4f8ee41ade96c96b3fd9881a Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Apr 12 15:19:27 2021 +0200 [FLINK-20613][table] Introduce a legacy to non-legacy data type transformation --- .../table/types/inference/TypeTransformation.java | 12 +++++++ .../table/types/inference/TypeTransformations.java | 14 ++++---- ...n.java => LegacyToNonLegacyTransformation.java} | 40 ++++++++++++++-------- .../flink/table/types/utils/DataTypeUtils.java | 31 ++++++++++++++--- .../types/utils/TypeInfoDataTypeConverter.java | 22 +++++++++--- .../types/inference/TypeTransformationsTest.java | 24 ------------- .../table/planner/connectors/DynamicSinkUtils.java | 23 +++++++++---- .../table/planner/delegation/PlannerBase.scala | 17 +++++++-- .../flink/table/planner/sinks/TableSinkUtils.scala | 9 +---- 9 files changed, 120 insertions(+), 72 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformation.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformation.java index 97cdfd4..3a14033 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformation.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformation.java @@ -19,12 +19,24 @@ package org.apache.flink.table.types.inference; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.types.DataType; +import javax.annotation.Nullable; + /** Transforms one data type to another. */ @PublicEvolving public interface TypeTransformation { /** Transforms the given data type to a different data type. */ DataType transform(DataType typeToTransform); + + /** + * Transforms the given data type to a different data type. + * + * <p>This method provides a {@link DataTypeFactory} if available. + */ + default DataType transform(@Nullable DataTypeFactory factory, DataType typeToTransform) { + return transform(typeToTransform); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java index 72ff8d1..28afb9a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java @@ -21,8 +21,8 @@ package org.apache.flink.table.types.inference; import org.apache.flink.annotation.Internal; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.transforms.DataTypeConversionClassTransformation; -import org.apache.flink.table.types.inference.transforms.LegacyDecimalTypeTransformation; import org.apache.flink.table.types.inference.transforms.LegacyRawTypeTransformation; +import org.apache.flink.table.types.inference.transforms.LegacyToNonLegacyTransformation; import org.apache.flink.table.types.logical.LogicalTypeRoot; import java.sql.Date; @@ -59,13 +59,6 @@ public final class TypeTransformations { } /** - * Returns a type transformation that transforms legacy decimal data type to DECIMAL(38, 18). - */ - public static TypeTransformation legacyDecimalToDefaultDecimal() { - return LegacyDecimalTypeTransformation.INSTANCE; - } - - /** * Returns a type transformation that transforms LEGACY('RAW', ...) type to the RAW(..., ?) * type. */ @@ -73,6 +66,11 @@ public final class TypeTransformations { return LegacyRawTypeTransformation.INSTANCE; } + /** Returns a type transformation that transforms LEGACY(...) type to a non-legacy type. */ + public static TypeTransformation legacyToNonLegacy() { + return LegacyToNonLegacyTransformation.INSTANCE; + } + /** * Returns a type transformation that transforms data type to nullable data type but keeps other * information unchanged. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyDecimalTypeTransformation.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyToNonLegacyTransformation.java similarity index 54% rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyDecimalTypeTransformation.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyToNonLegacyTransformation.java index b5e3057..cbd11d5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyDecimalTypeTransformation.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyToNonLegacyTransformation.java @@ -18,32 +18,42 @@ package org.apache.flink.table.types.inference.transforms; -import org.apache.flink.table.api.DataTypes; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.TypeTransformation; -import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.utils.TypeInfoDataTypeConverter; + +import javax.annotation.Nullable; /** - * This type transformation transforms the legacy decimal type (usually converted from {@link - * org.apache.flink.api.common.typeinfo.Types#BIG_DEC}) to DECIMAL(38, 18). + * Transformation that applies {@link TypeInfoDataTypeConverter} on {@link + * LegacyTypeInformationType}. */ -public class LegacyDecimalTypeTransformation implements TypeTransformation { +@Internal +public class LegacyToNonLegacyTransformation implements TypeTransformation { - public static final TypeTransformation INSTANCE = new LegacyDecimalTypeTransformation(); + public static final TypeTransformation INSTANCE = new LegacyToNonLegacyTransformation(); @Override public DataType transform(DataType typeToTransform) { - LogicalType logicalType = typeToTransform.getLogicalType(); - if (logicalType instanceof LegacyTypeInformationType - && logicalType.getTypeRoot() == LogicalTypeRoot.DECIMAL) { - DataType decimalType = - DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18) - .bridgedTo(typeToTransform.getConversionClass()); - return logicalType.isNullable() ? decimalType : decimalType.notNull(); + return transform(null, typeToTransform); + } + + @Override + public DataType transform(@Nullable DataTypeFactory factory, DataType dataType) { + if (factory == null) { + throw new TableException( + "LegacyToNonLegacyTransformation requires access to the data type factory."); + } + final LogicalType type = dataType.getLogicalType(); + if (type instanceof LegacyTypeInformationType) { + return TypeInfoDataTypeConverter.toDataType( + factory, ((LegacyTypeInformationType<?>) type).getTypeInformation(), true); } - return typeToTransform; + return dataType; } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java index a7c5af7..320edaf 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.types.AtomicDataType; @@ -51,6 +52,8 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -228,23 +231,36 @@ public final class DataTypeUtils { /** * Transforms the given data type to a different data type using the given transformations. * + * @see #transform(DataTypeFactory, DataType, TypeTransformation...) + */ + public static DataType transform( + DataType typeToTransform, TypeTransformation... transformations) { + return transform(null, typeToTransform, transformations); + } + + /** + * Transforms the given data type to a different data type using the given transformations. + * * <p>The transformations will be called in the given order. In case of constructed or composite * types, a transformation will be applied transitively to children first. * * <p>Both the {@link DataType#getLogicalType()} and {@link DataType#getConversionClass()} can * be transformed. * + * @param factory {@link DataTypeFactory} if available * @param typeToTransform data type to be transformed. * @param transformations the transformations to transform data type to another type. * @return the new data type */ public static DataType transform( - DataType typeToTransform, TypeTransformation... transformations) { + @Nullable DataTypeFactory factory, + DataType typeToTransform, + TypeTransformation... transformations) { Preconditions.checkArgument( transformations.length > 0, "transformations should not be empty."); DataType newType = typeToTransform; for (TypeTransformation transformation : transformations) { - newType = newType.accept(new DataTypeTransformer(transformation)); + newType = newType.accept(new DataTypeTransformer(factory, transformation)); } return newType; } @@ -407,15 +423,19 @@ public final class DataTypeUtils { */ private static class DataTypeTransformer implements DataTypeVisitor<DataType> { + private final @Nullable DataTypeFactory factory; + private final TypeTransformation transformation; - private DataTypeTransformer(TypeTransformation transformation) { + private DataTypeTransformer( + @Nullable DataTypeFactory factory, TypeTransformation transformation) { + this.factory = factory; this.transformation = transformation; } @Override public DataType visit(AtomicDataType atomicDataType) { - return transformation.transform(atomicDataType); + return transformation.transform(factory, atomicDataType); } @Override @@ -434,6 +454,7 @@ public final class DataTypeUtils { "Unsupported logical type : " + logicalType); } return transformation.transform( + factory, new CollectionDataType( newLogicalType, collectionDataType.getConversionClass(), @@ -499,6 +520,7 @@ public final class DataTypeUtils { "Unsupported logical type : " + logicalType); } return transformation.transform( + factory, new FieldsDataType( newLogicalType, fieldsDataType.getConversionClass(), newDataTypes)); } @@ -520,6 +542,7 @@ public final class DataTypeUtils { "Unsupported logical type : " + logicalType); } return transformation.transform( + factory, new KeyValueDataType( newLogicalType, keyValueDataType.getConversionClass(), diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java index 7119def..df99d69 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java @@ -150,9 +150,20 @@ public final class TypeInfoDataTypeConverter { } /** Converts the given {@link TypeInformation} into {@link DataType}. */ - @SuppressWarnings("rawtypes") public static DataType toDataType( DataTypeFactory dataTypeFactory, TypeInformation<?> typeInfo) { + return toDataType(dataTypeFactory, typeInfo, false); + } + + /** + * Converts the given {@link TypeInformation} into {@link DataType} but allows to make all + * fields nullable independent of the nullability in the serialization stack. + */ + @SuppressWarnings("rawtypes") + public static DataType toDataType( + DataTypeFactory dataTypeFactory, + TypeInformation<?> typeInfo, + boolean forceNullability) { if (typeInfo instanceof DataTypeQueryable) { return ((DataTypeQueryable) typeInfo).getDataType(); } @@ -186,7 +197,8 @@ public final class TypeInfoDataTypeConverter { ((MapTypeInfo) typeInfo).getKeyTypeInfo(), ((MapTypeInfo) typeInfo).getValueTypeInfo()); } else if (typeInfo instanceof CompositeType) { - return convertToStructuredType(dataTypeFactory, (CompositeType) typeInfo); + return convertToStructuredType( + dataTypeFactory, (CompositeType) typeInfo, forceNullability); } // treat everything else as RAW type @@ -248,7 +260,9 @@ public final class TypeInfoDataTypeConverter { } private static DataType convertToStructuredType( - DataTypeFactory dataTypeFactory, CompositeType<?> compositeType) { + DataTypeFactory dataTypeFactory, + CompositeType<?> compositeType, + boolean forceNullability) { final int arity = compositeType.getArity(); final String[] fieldNames = compositeType.getFieldNames(); final Class<?> typeClass = compositeType.getTypeClass(); @@ -299,7 +313,7 @@ public final class TypeInfoDataTypeConverter { // for tuples and case classes else { // serializers don't support top-level nulls - isNullable = false; + isNullable = forceNullability; // based on type information all fields are boxed classes, // but case classes might contain primitives diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java index c006228..fc85101 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java @@ -38,7 +38,6 @@ import java.sql.Timestamp; import java.util.List; import static org.apache.flink.table.types.inference.TypeTransformations.TO_INTERNAL_CLASS; -import static org.apache.flink.table.types.inference.TypeTransformations.legacyDecimalToDefaultDecimal; import static org.apache.flink.table.types.inference.TypeTransformations.legacyRawToTypeInfoRaw; import static org.apache.flink.table.types.inference.TypeTransformations.timeToSqlTypes; import static org.apache.flink.table.types.inference.TypeTransformations.toNullable; @@ -100,25 +99,6 @@ public class TypeTransformationsTest { } @Test - public void testLegacyDecimalToDefaultDecimal() { - DataType dataType = - DataTypes.ROW( - DataTypes.FIELD("a", DataTypes.STRING()), - DataTypes.FIELD("b", DataTypes.DECIMAL(10, 3)), - DataTypes.FIELD("c", createLegacyDecimal()), - DataTypes.FIELD("d", DataTypes.ARRAY(createLegacyDecimal()))); - - DataType expected = - DataTypes.ROW( - DataTypes.FIELD("a", DataTypes.STRING()), - DataTypes.FIELD("b", DataTypes.DECIMAL(10, 3)), - DataTypes.FIELD("c", DataTypes.DECIMAL(38, 18)), - DataTypes.FIELD("d", DataTypes.ARRAY(DataTypes.DECIMAL(38, 18)))); - - assertEquals(expected, DataTypeUtils.transform(dataType, legacyDecimalToDefaultDecimal())); - } - - @Test public void testLegacyRawToTypeInfoRaw() { DataType dataType = DataTypes.ROW( @@ -168,10 +148,6 @@ public class TypeTransformationsTest { // -------------------------------------------------------------------------------------------- - private static DataType createLegacyDecimal() { - return TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC); - } - private static DataType createLegacyRaw() { return TypeConversions.fromLegacyInfoToDataType( Types.GENERIC(TypeTransformationsTest.class)); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 5c02f45..d982b70 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; @@ -41,7 +42,6 @@ import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec; import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec; import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec; import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink; -import org.apache.flink.table.planner.utils.ShortcutUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.TypeTransformations; import org.apache.flink.table.types.logical.LogicalType; @@ -67,6 +67,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast; import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsExplicitCast; import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast; @@ -128,7 +130,9 @@ public final class DynamicSinkUtils { boolean isOverwrite, DynamicTableSink sink, ResolvedCatalogTable table) { - final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(relBuilder); + final DataTypeFactory dataTypeFactory = + unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory(); + final FlinkTypeFactory typeFactory = unwrapTypeFactory(relBuilder); final TableSchema schema = table.getSchema(); List<SinkAbilitySpec> sinkAbilitySpecs = new ArrayList<>(); @@ -140,7 +144,8 @@ public final class DynamicSinkUtils { // 2. validate the query schema to the sink's table schema and apply cast if possible final RelNode query = - validateSchemaAndApplyImplicitCast(input, schema, sinkIdentifier, typeFactory); + validateSchemaAndApplyImplicitCast( + input, schema, sinkIdentifier, dataTypeFactory, typeFactory); relBuilder.push(query); // 3. convert the sink's table schema to the consumed data type of the sink @@ -171,12 +176,15 @@ public final class DynamicSinkUtils { RelNode query, TableSchema sinkSchema, @Nullable ObjectIdentifier sinkIdentifier, + DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory) { final RowType queryType = FlinkTypeFactory.toLogicalRowType(query.getRowType()); final List<RowField> queryFields = queryType.getFields(); final RowType sinkType = - (RowType) fixSinkDataType(sinkSchema.toPersistedRowDataType()).getLogicalType(); + (RowType) + fixSinkDataType(dataTypeFactory, sinkSchema.toPersistedRowDataType()) + .getLogicalType(); final List<RowField> sinkFields = sinkType.getFields(); if (queryFields.size() != sinkFields.size()) { @@ -378,14 +386,15 @@ public final class DynamicSinkUtils { tableName, cause, querySchema, sinkSchema)); } - private static DataType fixSinkDataType(DataType sinkDataType) { - // we recognize legacy decimal is the same to default decimal + private static DataType fixSinkDataType( + DataTypeFactory dataTypeFactory, DataType sinkDataType) { // we ignore NULL constraint, the NULL constraint will be checked during runtime // see StreamExecSink and BatchExecSink return DataTypeUtils.transform( + dataTypeFactory, sinkDataType, - TypeTransformations.legacyDecimalToDefaultDecimal(), TypeTransformations.legacyRawToTypeInfoRaw(), + TypeTransformations.legacyToNonLegacy(), TypeTransformations.toNullable()); } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index ad10d83..eab74f7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -175,12 +175,18 @@ abstract class PlannerBase( */ @VisibleForTesting private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = { + val dataTypeFactory = catalogManager.getDataTypeFactory modifyOperation match { case s: UnregisteredSinkModifyOperation[_] => val input = getRelBuilder.queryOperation(s.getChild).build() val sinkSchema = s.getSink.getTableSchema // validate query schema and sink schema, and apply cast if possible - val query = validateSchemaAndApplyImplicitCast(input, sinkSchema, null, getTypeFactory) + val query = validateSchemaAndApplyImplicitCast( + input, + sinkSchema, + null, + dataTypeFactory, + getTypeFactory) LogicalLegacySink.create( query, s.getSink, @@ -194,7 +200,12 @@ abstract class PlannerBase( SelectTableSinkSchemaConverter.changeDefaultConversionClass( TableSchema.fromResolvedSchema(s.getChild.getResolvedSchema))) // validate query schema and sink schema, and apply cast if possible - val query = validateSchemaAndApplyImplicitCast(input, sinkSchema, null, getTypeFactory) + val query = validateSchemaAndApplyImplicitCast( + input, + sinkSchema, + null, + dataTypeFactory, + getTypeFactory) val sink = createSelectTableSink(sinkSchema) s.setSelectResultProvider(sink.getSelectResultProvider) LogicalLegacySink.create( @@ -220,6 +231,7 @@ abstract class PlannerBase( input, TableSchemaUtils.getPhysicalSchema(table.getSchema), catalogSink.getTableIdentifier, + dataTypeFactory, getTypeFactory) LogicalLegacySink.create( query, @@ -259,6 +271,7 @@ abstract class PlannerBase( input, sinkPhysicalSchema, null, + dataTypeFactory, getTypeFactory) val tableSink = new DataStreamTableSink( FlinkTypeFactory.toTableSchema(query.getRowType), diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala index 67f1c26..87757e8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala @@ -23,18 +23,14 @@ import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, Tuple import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.table.api._ import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier} -import org.apache.flink.table.connector.sink.DynamicTableSink -import org.apache.flink.table.connector.sink.abilities.{SupportsOverwrite, SupportsPartitioning} import org.apache.flink.table.data.RowData import org.apache.flink.table.operations.CatalogSinkModifyOperation -import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.connectors.DynamicSinkUtils import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.sinks._ import org.apache.flink.table.types.DataType -import org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal, legacyRawToTypeInfoRaw, toNullable} -import org.apache.flink.table.types.logical.utils.LogicalTypeCasts.{supportsAvoidingCast, supportsImplicitCast} +import org.apache.flink.table.types.inference.TypeTransformations.toNullable import org.apache.flink.table.types.logical.utils.LogicalTypeChecks import org.apache.flink.table.types.logical.{LegacyTypeInformationType, RowType} import org.apache.flink.table.types.utils.DataTypeUtils @@ -42,9 +38,6 @@ import org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataT import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils} import org.apache.flink.types.Row -import org.apache.calcite.plan.RelOptUtil -import org.apache.calcite.rel.RelNode - import _root_.scala.collection.JavaConversions._ /**