This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c3a7b6371335d86a4f8ee41ade96c96b3fd9881a
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Apr 12 15:19:27 2021 +0200

    [FLINK-20613][table] Introduce a legacy to non-legacy data type 
transformation
---
 .../table/types/inference/TypeTransformation.java  | 12 +++++++
 .../table/types/inference/TypeTransformations.java | 14 ++++----
 ...n.java => LegacyToNonLegacyTransformation.java} | 40 ++++++++++++++--------
 .../flink/table/types/utils/DataTypeUtils.java     | 31 ++++++++++++++---
 .../types/utils/TypeInfoDataTypeConverter.java     | 22 +++++++++---
 .../types/inference/TypeTransformationsTest.java   | 24 -------------
 .../table/planner/connectors/DynamicSinkUtils.java | 23 +++++++++----
 .../table/planner/delegation/PlannerBase.scala     | 17 +++++++--
 .../flink/table/planner/sinks/TableSinkUtils.scala |  9 +----
 9 files changed, 120 insertions(+), 72 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformation.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformation.java
index 97cdfd4..3a14033 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformation.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformation.java
@@ -19,12 +19,24 @@
 package org.apache.flink.table.types.inference;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.DataType;
 
+import javax.annotation.Nullable;
+
 /** Transforms one data type to another. */
 @PublicEvolving
 public interface TypeTransformation {
 
     /** Transforms the given data type to a different data type. */
     DataType transform(DataType typeToTransform);
+
+    /**
+     * Transforms the given data type to a different data type.
+     *
+     * <p>This method provides a {@link DataTypeFactory} if available.
+     */
+    default DataType transform(@Nullable DataTypeFactory factory, DataType 
typeToTransform) {
+        return transform(typeToTransform);
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java
index 72ff8d1..28afb9a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java
@@ -21,8 +21,8 @@ package org.apache.flink.table.types.inference;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.types.DataType;
 import 
org.apache.flink.table.types.inference.transforms.DataTypeConversionClassTransformation;
-import 
org.apache.flink.table.types.inference.transforms.LegacyDecimalTypeTransformation;
 import 
org.apache.flink.table.types.inference.transforms.LegacyRawTypeTransformation;
+import 
org.apache.flink.table.types.inference.transforms.LegacyToNonLegacyTransformation;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import java.sql.Date;
@@ -59,13 +59,6 @@ public final class TypeTransformations {
     }
 
     /**
-     * Returns a type transformation that transforms legacy decimal data type 
to DECIMAL(38, 18).
-     */
-    public static TypeTransformation legacyDecimalToDefaultDecimal() {
-        return LegacyDecimalTypeTransformation.INSTANCE;
-    }
-
-    /**
      * Returns a type transformation that transforms LEGACY('RAW', ...) type 
to the RAW(..., ?)
      * type.
      */
@@ -73,6 +66,11 @@ public final class TypeTransformations {
         return LegacyRawTypeTransformation.INSTANCE;
     }
 
+    /** Returns a type transformation that transforms LEGACY(...) type to a 
non-legacy type. */
+    public static TypeTransformation legacyToNonLegacy() {
+        return LegacyToNonLegacyTransformation.INSTANCE;
+    }
+
     /**
      * Returns a type transformation that transforms data type to nullable 
data type but keeps other
      * information unchanged.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyDecimalTypeTransformation.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyToNonLegacyTransformation.java
similarity index 54%
rename from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyDecimalTypeTransformation.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyToNonLegacyTransformation.java
index b5e3057..cbd11d5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyDecimalTypeTransformation.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/LegacyToNonLegacyTransformation.java
@@ -18,32 +18,42 @@
 
 package org.apache.flink.table.types.inference.transforms;
 
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.TypeTransformation;
-import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.utils.TypeInfoDataTypeConverter;
+
+import javax.annotation.Nullable;
 
 /**
- * This type transformation transforms the legacy decimal type (usually 
converted from {@link
- * org.apache.flink.api.common.typeinfo.Types#BIG_DEC}) to DECIMAL(38, 18).
+ * Transformation that applies {@link TypeInfoDataTypeConverter} on {@link
+ * LegacyTypeInformationType}.
  */
-public class LegacyDecimalTypeTransformation implements TypeTransformation {
+@Internal
+public class LegacyToNonLegacyTransformation implements TypeTransformation {
 
-    public static final TypeTransformation INSTANCE = new 
LegacyDecimalTypeTransformation();
+    public static final TypeTransformation INSTANCE = new 
LegacyToNonLegacyTransformation();
 
     @Override
     public DataType transform(DataType typeToTransform) {
-        LogicalType logicalType = typeToTransform.getLogicalType();
-        if (logicalType instanceof LegacyTypeInformationType
-                && logicalType.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
-            DataType decimalType =
-                    DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)
-                            .bridgedTo(typeToTransform.getConversionClass());
-            return logicalType.isNullable() ? decimalType : 
decimalType.notNull();
+        return transform(null, typeToTransform);
+    }
+
+    @Override
+    public DataType transform(@Nullable DataTypeFactory factory, DataType 
dataType) {
+        if (factory == null) {
+            throw new TableException(
+                    "LegacyToNonLegacyTransformation requires access to the 
data type factory.");
+        }
+        final LogicalType type = dataType.getLogicalType();
+        if (type instanceof LegacyTypeInformationType) {
+            return TypeInfoDataTypeConverter.toDataType(
+                    factory, ((LegacyTypeInformationType<?>) 
type).getTypeInformation(), true);
         }
-        return typeToTransform;
+        return dataType;
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index a7c5af7..320edaf 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.types.AtomicDataType;
@@ -51,6 +52,8 @@ import 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -228,23 +231,36 @@ public final class DataTypeUtils {
     /**
      * Transforms the given data type to a different data type using the given 
transformations.
      *
+     * @see #transform(DataTypeFactory, DataType, TypeTransformation...)
+     */
+    public static DataType transform(
+            DataType typeToTransform, TypeTransformation... transformations) {
+        return transform(null, typeToTransform, transformations);
+    }
+
+    /**
+     * Transforms the given data type to a different data type using the given 
transformations.
+     *
      * <p>The transformations will be called in the given order. In case of 
constructed or composite
      * types, a transformation will be applied transitively to children first.
      *
      * <p>Both the {@link DataType#getLogicalType()} and {@link 
DataType#getConversionClass()} can
      * be transformed.
      *
+     * @param factory {@link DataTypeFactory} if available
      * @param typeToTransform data type to be transformed.
      * @param transformations the transformations to transform data type to 
another type.
      * @return the new data type
      */
     public static DataType transform(
-            DataType typeToTransform, TypeTransformation... transformations) {
+            @Nullable DataTypeFactory factory,
+            DataType typeToTransform,
+            TypeTransformation... transformations) {
         Preconditions.checkArgument(
                 transformations.length > 0, "transformations should not be 
empty.");
         DataType newType = typeToTransform;
         for (TypeTransformation transformation : transformations) {
-            newType = newType.accept(new DataTypeTransformer(transformation));
+            newType = newType.accept(new DataTypeTransformer(factory, 
transformation));
         }
         return newType;
     }
@@ -407,15 +423,19 @@ public final class DataTypeUtils {
      */
     private static class DataTypeTransformer implements 
DataTypeVisitor<DataType> {
 
+        private final @Nullable DataTypeFactory factory;
+
         private final TypeTransformation transformation;
 
-        private DataTypeTransformer(TypeTransformation transformation) {
+        private DataTypeTransformer(
+                @Nullable DataTypeFactory factory, TypeTransformation 
transformation) {
+            this.factory = factory;
             this.transformation = transformation;
         }
 
         @Override
         public DataType visit(AtomicDataType atomicDataType) {
-            return transformation.transform(atomicDataType);
+            return transformation.transform(factory, atomicDataType);
         }
 
         @Override
@@ -434,6 +454,7 @@ public final class DataTypeUtils {
                         "Unsupported logical type : " + logicalType);
             }
             return transformation.transform(
+                    factory,
                     new CollectionDataType(
                             newLogicalType,
                             collectionDataType.getConversionClass(),
@@ -499,6 +520,7 @@ public final class DataTypeUtils {
                         "Unsupported logical type : " + logicalType);
             }
             return transformation.transform(
+                    factory,
                     new FieldsDataType(
                             newLogicalType, 
fieldsDataType.getConversionClass(), newDataTypes));
         }
@@ -520,6 +542,7 @@ public final class DataTypeUtils {
                         "Unsupported logical type : " + logicalType);
             }
             return transformation.transform(
+                    factory,
                     new KeyValueDataType(
                             newLogicalType,
                             keyValueDataType.getConversionClass(),
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java
index 7119def..df99d69 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java
@@ -150,9 +150,20 @@ public final class TypeInfoDataTypeConverter {
     }
 
     /** Converts the given {@link TypeInformation} into {@link DataType}. */
-    @SuppressWarnings("rawtypes")
     public static DataType toDataType(
             DataTypeFactory dataTypeFactory, TypeInformation<?> typeInfo) {
+        return toDataType(dataTypeFactory, typeInfo, false);
+    }
+
+    /**
+     * Converts the given {@link TypeInformation} into {@link DataType} but 
allows to make all
+     * fields nullable independent of the nullability in the serialization 
stack.
+     */
+    @SuppressWarnings("rawtypes")
+    public static DataType toDataType(
+            DataTypeFactory dataTypeFactory,
+            TypeInformation<?> typeInfo,
+            boolean forceNullability) {
         if (typeInfo instanceof DataTypeQueryable) {
             return ((DataTypeQueryable) typeInfo).getDataType();
         }
@@ -186,7 +197,8 @@ public final class TypeInfoDataTypeConverter {
                     ((MapTypeInfo) typeInfo).getKeyTypeInfo(),
                     ((MapTypeInfo) typeInfo).getValueTypeInfo());
         } else if (typeInfo instanceof CompositeType) {
-            return convertToStructuredType(dataTypeFactory, (CompositeType) 
typeInfo);
+            return convertToStructuredType(
+                    dataTypeFactory, (CompositeType) typeInfo, 
forceNullability);
         }
 
         // treat everything else as RAW type
@@ -248,7 +260,9 @@ public final class TypeInfoDataTypeConverter {
     }
 
     private static DataType convertToStructuredType(
-            DataTypeFactory dataTypeFactory, CompositeType<?> compositeType) {
+            DataTypeFactory dataTypeFactory,
+            CompositeType<?> compositeType,
+            boolean forceNullability) {
         final int arity = compositeType.getArity();
         final String[] fieldNames = compositeType.getFieldNames();
         final Class<?> typeClass = compositeType.getTypeClass();
@@ -299,7 +313,7 @@ public final class TypeInfoDataTypeConverter {
         // for tuples and case classes
         else {
             // serializers don't support top-level nulls
-            isNullable = false;
+            isNullable = forceNullability;
 
             // based on type information all fields are boxed classes,
             // but case classes might contain primitives
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
index c006228..fc85101 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
@@ -38,7 +38,6 @@ import java.sql.Timestamp;
 import java.util.List;
 
 import static 
org.apache.flink.table.types.inference.TypeTransformations.TO_INTERNAL_CLASS;
-import static 
org.apache.flink.table.types.inference.TypeTransformations.legacyDecimalToDefaultDecimal;
 import static 
org.apache.flink.table.types.inference.TypeTransformations.legacyRawToTypeInfoRaw;
 import static 
org.apache.flink.table.types.inference.TypeTransformations.timeToSqlTypes;
 import static 
org.apache.flink.table.types.inference.TypeTransformations.toNullable;
@@ -100,25 +99,6 @@ public class TypeTransformationsTest {
     }
 
     @Test
-    public void testLegacyDecimalToDefaultDecimal() {
-        DataType dataType =
-                DataTypes.ROW(
-                        DataTypes.FIELD("a", DataTypes.STRING()),
-                        DataTypes.FIELD("b", DataTypes.DECIMAL(10, 3)),
-                        DataTypes.FIELD("c", createLegacyDecimal()),
-                        DataTypes.FIELD("d", 
DataTypes.ARRAY(createLegacyDecimal())));
-
-        DataType expected =
-                DataTypes.ROW(
-                        DataTypes.FIELD("a", DataTypes.STRING()),
-                        DataTypes.FIELD("b", DataTypes.DECIMAL(10, 3)),
-                        DataTypes.FIELD("c", DataTypes.DECIMAL(38, 18)),
-                        DataTypes.FIELD("d", 
DataTypes.ARRAY(DataTypes.DECIMAL(38, 18))));
-
-        assertEquals(expected, DataTypeUtils.transform(dataType, 
legacyDecimalToDefaultDecimal()));
-    }
-
-    @Test
     public void testLegacyRawToTypeInfoRaw() {
         DataType dataType =
                 DataTypes.ROW(
@@ -168,10 +148,6 @@ public class TypeTransformationsTest {
 
     // 
--------------------------------------------------------------------------------------------
 
-    private static DataType createLegacyDecimal() {
-        return TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC);
-    }
-
     private static DataType createLegacyRaw() {
         return TypeConversions.fromLegacyInfoToDataType(
                 Types.GENERIC(TypeTransformationsTest.class));
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 5c02f45..d982b70 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -41,7 +42,6 @@ import 
org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
-import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.TypeTransformations;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -67,6 +67,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
+import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsExplicitCast;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
@@ -128,7 +130,9 @@ public final class DynamicSinkUtils {
             boolean isOverwrite,
             DynamicTableSink sink,
             ResolvedCatalogTable table) {
-        final FlinkTypeFactory typeFactory = 
ShortcutUtils.unwrapTypeFactory(relBuilder);
+        final DataTypeFactory dataTypeFactory =
+                
unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+        final FlinkTypeFactory typeFactory = unwrapTypeFactory(relBuilder);
         final TableSchema schema = table.getSchema();
 
         List<SinkAbilitySpec> sinkAbilitySpecs = new ArrayList<>();
@@ -140,7 +144,8 @@ public final class DynamicSinkUtils {
 
         // 2. validate the query schema to the sink's table schema and apply 
cast if possible
         final RelNode query =
-                validateSchemaAndApplyImplicitCast(input, schema, 
sinkIdentifier, typeFactory);
+                validateSchemaAndApplyImplicitCast(
+                        input, schema, sinkIdentifier, dataTypeFactory, 
typeFactory);
         relBuilder.push(query);
 
         // 3. convert the sink's table schema to the consumed data type of the 
sink
@@ -171,12 +176,15 @@ public final class DynamicSinkUtils {
             RelNode query,
             TableSchema sinkSchema,
             @Nullable ObjectIdentifier sinkIdentifier,
+            DataTypeFactory dataTypeFactory,
             FlinkTypeFactory typeFactory) {
         final RowType queryType = 
FlinkTypeFactory.toLogicalRowType(query.getRowType());
         final List<RowField> queryFields = queryType.getFields();
 
         final RowType sinkType =
-                (RowType) 
fixSinkDataType(sinkSchema.toPersistedRowDataType()).getLogicalType();
+                (RowType)
+                        fixSinkDataType(dataTypeFactory, 
sinkSchema.toPersistedRowDataType())
+                                .getLogicalType();
         final List<RowField> sinkFields = sinkType.getFields();
 
         if (queryFields.size() != sinkFields.size()) {
@@ -378,14 +386,15 @@ public final class DynamicSinkUtils {
                         tableName, cause, querySchema, sinkSchema));
     }
 
-    private static DataType fixSinkDataType(DataType sinkDataType) {
-        // we recognize legacy decimal is the same to default decimal
+    private static DataType fixSinkDataType(
+            DataTypeFactory dataTypeFactory, DataType sinkDataType) {
         // we ignore NULL constraint, the NULL constraint will be checked 
during runtime
         // see StreamExecSink and BatchExecSink
         return DataTypeUtils.transform(
+                dataTypeFactory,
                 sinkDataType,
-                TypeTransformations.legacyDecimalToDefaultDecimal(),
                 TypeTransformations.legacyRawToTypeInfoRaw(),
+                TypeTransformations.legacyToNonLegacy(),
                 TypeTransformations.toNullable());
     }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index ad10d83..eab74f7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -175,12 +175,18 @@ abstract class PlannerBase(
     */
   @VisibleForTesting
   private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode 
= {
+    val dataTypeFactory = catalogManager.getDataTypeFactory
     modifyOperation match {
       case s: UnregisteredSinkModifyOperation[_] =>
         val input = getRelBuilder.queryOperation(s.getChild).build()
         val sinkSchema = s.getSink.getTableSchema
         // validate query schema and sink schema, and apply cast if possible
-        val query = validateSchemaAndApplyImplicitCast(input, sinkSchema, 
null, getTypeFactory)
+        val query = validateSchemaAndApplyImplicitCast(
+          input,
+          sinkSchema,
+          null,
+          dataTypeFactory,
+          getTypeFactory)
         LogicalLegacySink.create(
           query,
           s.getSink,
@@ -194,7 +200,12 @@ abstract class PlannerBase(
           SelectTableSinkSchemaConverter.changeDefaultConversionClass(
             TableSchema.fromResolvedSchema(s.getChild.getResolvedSchema)))
         // validate query schema and sink schema, and apply cast if possible
-        val query = validateSchemaAndApplyImplicitCast(input, sinkSchema, 
null, getTypeFactory)
+        val query = validateSchemaAndApplyImplicitCast(
+          input,
+          sinkSchema,
+          null,
+          dataTypeFactory,
+          getTypeFactory)
         val sink = createSelectTableSink(sinkSchema)
         s.setSelectResultProvider(sink.getSelectResultProvider)
         LogicalLegacySink.create(
@@ -220,6 +231,7 @@ abstract class PlannerBase(
               input,
               TableSchemaUtils.getPhysicalSchema(table.getSchema),
               catalogSink.getTableIdentifier,
+              dataTypeFactory,
               getTypeFactory)
             LogicalLegacySink.create(
               query,
@@ -259,6 +271,7 @@ abstract class PlannerBase(
           input,
           sinkPhysicalSchema,
           null,
+          dataTypeFactory,
           getTypeFactory)
         val tableSink = new DataStreamTableSink(
           FlinkTypeFactory.toTableSchema(query.getRowType),
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
index 67f1c26..87757e8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
@@ -23,18 +23,14 @@ import 
org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, Tuple
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.table.api._
 import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
-import org.apache.flink.table.connector.sink.DynamicTableSink
-import org.apache.flink.table.connector.sink.abilities.{SupportsOverwrite, 
SupportsPartitioning}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.connectors.DynamicSinkUtils
 import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.types.DataType
-import 
org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal,
 legacyRawToTypeInfoRaw, toNullable}
-import 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.{supportsAvoidingCast,
 supportsImplicitCast}
+import org.apache.flink.table.types.inference.TypeTransformations.toNullable
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.table.types.logical.{LegacyTypeInformationType, 
RowType}
 import org.apache.flink.table.types.utils.DataTypeUtils
@@ -42,9 +38,6 @@ import 
org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataT
 import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
 import org.apache.flink.types.Row
 
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-
 import _root_.scala.collection.JavaConversions._
 
 /**

Reply via email to