This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new ff7b4f6  [FLINK-17313] Fix type validation error when sink table ddl 
contains columns with precision of decimal/varchar (#11993)
ff7b4f6 is described below

commit ff7b4f6f0e073477b7a93dbda8243e7fc8647f50
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue May 5 20:04:20 2020 +0200

    [FLINK-17313] Fix type validation error when sink table ddl contains 
columns with precision of decimal/varchar (#11993)
    
    Co-authored-by: Terry Wang <zjuwa...@foxmail.com>
---
 .../apache/flink/table/utils/TypeMappingUtils.java | 115 ++++++++++++++++-----
 .../flink/table/utils/TypeMappingUtilsTest.java    |  68 ++++++++++++
 2 files changed, 159 insertions(+), 24 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
index 8239100..e284787 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -46,7 +48,9 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
 
 /**
  * Utility methods for dealing with field types in {@link 
org.apache.flink.table.sources.TableSource}
@@ -157,19 +161,35 @@ public final class TypeMappingUtils {
                        String physicalFieldName,
                        String logicalFieldName,
                        boolean isSource) {
-               checkIfCompatible(
-                       physicalFieldType,
-                       logicalFieldType,
-                       (cause) -> new ValidationException(
+               Function<Throwable, ValidationException> exceptionSupplier = 
(cause) ->
+                       new ValidationException(
                                String.format(
                                        "Type %s of table field '%s' does not 
match with " +
-                                               "the physical type %s of the 
'%s' field of the %s.",
+                                               "the physical type %s of the 
'%s' field of the %s type.",
                                        logicalFieldType,
                                        logicalFieldName,
                                        physicalFieldType,
                                        physicalFieldName,
-                                       isSource ? "TableSource return type" : 
"TableSink consumed type"),
-                               cause));
+                                       isSource ? "TableSource return" : 
"TableSink consumed"),
+                               cause);
+               try {
+                       final boolean typesCompatible;
+                       if (isSource) {
+                               typesCompatible = checkIfCompatible(
+                                       physicalFieldType,
+                                       logicalFieldType);
+                       } else {
+                               typesCompatible = checkIfCompatible(
+                                       logicalFieldType,
+                                       physicalFieldType);
+                       }
+
+                       if (!typesCompatible) {
+                               throw exceptionSupplier.apply(null);
+                       }
+               } catch (Exception e) {
+                       throw exceptionSupplier.apply(e);
+               }
        }
 
        private static void verifyTimeAttributeType(TableColumn logicalColumn, 
String rowtimeOrProctime) {
@@ -243,38 +263,85 @@ public final class TypeMappingUtils {
                );
        }
 
-       private static void checkIfCompatible(
-                       LogicalType physicalFieldType,
-                       LogicalType logicalFieldType,
-                       Function<Throwable, ValidationException> 
exceptionSupplier) {
-               if (LogicalTypeChecks.areTypesCompatible(physicalFieldType, 
logicalFieldType)) {
-                       return;
+       private static boolean checkIfCompatible(
+                       LogicalType sourceType,
+                       LogicalType targetType) {
+               if (LogicalTypeChecks.areTypesCompatible(sourceType, 
targetType)) {
+                       return true;
+               }
+
+               Boolean targetTypeCompatible = targetType.accept(new 
LogicalTypeDefaultVisitor<Boolean>() {
+
+                       @Override
+                       public Boolean visit(VarCharType targetType) {
+                               if (sourceType.isNullable() && 
!targetType.isNullable()) {
+                                       return false;
+                               }
+                               // CHAR and VARCHAR are very compatible within 
bounds
+                               if ((hasRoot(sourceType, LogicalTypeRoot.CHAR) 
|| hasRoot(sourceType, LogicalTypeRoot.VARCHAR)) &&
+                                       getLength(sourceType) <= 
targetType.getLength()) {
+                                       return true;
+                               }
+                               return defaultMethod(targetType);
+                       }
+
+                       @Override
+                       public Boolean visit(VarBinaryType targetType) {
+                               if (sourceType.isNullable() && 
!targetType.isNullable()) {
+                                       return false;
+                               }
+                               // BINARY and VARBINARY are very compatible 
within bounds
+                               if ((hasRoot(sourceType, 
LogicalTypeRoot.BINARY) || hasRoot(sourceType, LogicalTypeRoot.VARBINARY)) &&
+                                       getLength(sourceType) <= 
targetType.getLength()) {
+                                       return true;
+                               }
+                               return defaultMethod(targetType);
+                       }
+
+                       @Override
+                       protected Boolean defaultMethod(LogicalType 
logicalType) {
+                               return false;
+                       }
+               });
+
+               if (targetTypeCompatible) {
+                       return true;
                }
 
-               physicalFieldType.accept(new LogicalTypeDefaultVisitor<Void>() {
+               return sourceType.accept(new 
LogicalTypeDefaultVisitor<Boolean>() {
+                       @Override
+                       public Boolean visit(DecimalType sourceType1) {
+                               //When targetType is a legacy decimal type, 
pass the check.
+                               if (targetType instanceof 
LegacyTypeInformationType
+                                       && targetType.getTypeRoot() == 
LogicalTypeRoot.DECIMAL) {
+                                       return true;
+                               }
+                               return defaultMethod(sourceType1);
+                       }
+
                        @Override
-                       public Void visit(LogicalType other) {
+                       public Boolean visit(LogicalType other) {
                                if (other instanceof LegacyTypeInformationType 
&& other.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
-                                       if (!(logicalFieldType instanceof 
DecimalType)) {
-                                               throw 
exceptionSupplier.apply(null);
+                                       if (!(targetType instanceof 
DecimalType)) {
+                                               return false;
                                        }
 
-                                       DecimalType logicalDecimalType = 
(DecimalType) logicalFieldType;
+                                       DecimalType logicalDecimalType = 
(DecimalType) targetType;
                                        if (logicalDecimalType.getPrecision() 
!= DecimalType.MAX_PRECISION ||
-                                                       
logicalDecimalType.getScale() != 18) {
-                                               throw 
exceptionSupplier.apply(new ValidationException(
-                                                       "Legacy decimal type 
can only be mapped to DECIMAL(38, 18)."));
+                                               logicalDecimalType.getScale() 
!= 18) {
+                                               throw new ValidationException(
+                                                       "Legacy decimal type 
can only be mapped to DECIMAL(38, 18).");
                                        }
 
-                                       return null;
+                                       return true;
                                }
 
                                return defaultMethod(other);
                        }
 
                        @Override
-                       protected Void defaultMethod(LogicalType logicalType) {
-                               throw exceptionSupplier.apply(null);
+                       protected Boolean defaultMethod(LogicalType 
logicalType) {
+                               return false;
                        }
                });
        }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
index daadd66..ab15581 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
@@ -18,16 +18,23 @@
 
 package org.apache.flink.table.utils;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -331,6 +338,30 @@ public class TypeMappingUtilsTest {
                );
        }
 
+       @Test
+       public void testCheckPhysicalLogicalTypeCompatible() {
+               TableSchema tableSchema = TableSchema.builder()
+                                                               .field("a", 
DataTypes.VARCHAR(2))
+                                                               .field("b", 
DataTypes.DECIMAL(20, 2))
+                                                               .build();
+               TableSink<Tuple2<Boolean, Row>> tableSink = new 
TestTableSink(tableSchema);
+               LegacyTypeInformationType<?> legacyDataType = 
(LegacyTypeInformationType<?>) tableSink.getConsumedDataType()
+                                                                               
                                .getLogicalType();
+               TypeInformation<?> legacyTypeInfo = ((TupleTypeInfo<?>) 
legacyDataType.getTypeInformation()).getTypeAt(1);
+               DataType physicalType = 
TypeConversions.fromLegacyInfoToDataType(legacyTypeInfo);
+               TableSchema physicSchema = 
DataTypeUtils.expandCompositeTypeToSchema(physicalType);
+               DataType[] logicalDataTypes = tableSchema.getFieldDataTypes();
+               DataType[] physicalDataTypes = physicSchema.getFieldDataTypes();
+               for (int i = 0; i < logicalDataTypes.length; i++) {
+                       TypeMappingUtils.checkPhysicalLogicalTypeCompatible(
+                                       physicalDataTypes[i].getLogicalType(),
+                                       logicalDataTypes[i].getLogicalType(),
+                                       "physicalField",
+                                       "logicalField",
+                                       false);
+               }
+       }
+
        private static class TestTableSource
                implements TableSource<Object>, DefinedProctimeAttribute, 
DefinedRowtimeAttributes {
 
@@ -370,4 +401,41 @@ public class TypeMappingUtilsTest {
                        throw new UnsupportedOperationException("Should not be 
called");
                }
        }
+
+       /**
+        * Since UpsertStreamTableSink not in flink-table-common module, here 
we use Tuple2 &lt;Boolean, Row&gt; to
+        * simulate the behavior of UpsertStreamTableSink.
+        */
+       private static class TestTableSink implements TableSink<Tuple2<Boolean, 
Row>> {
+               private final TableSchema tableSchema;
+
+               private TestTableSink(TableSchema tableSchema) {
+                       this.tableSchema = tableSchema;
+               }
+
+               TypeInformation<Row> getRecordType() {
+                       return tableSchema.toRowType();
+               }
+
+               @Override
+               public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
+                       return new TupleTypeInfo<>(Types.BOOLEAN, 
getRecordType());
+               }
+
+               @Override
+               public String[] getFieldNames() {
+                       return tableSchema.getFieldNames();
+               }
+
+               @Override
+               public TypeInformation<?>[] getFieldTypes() {
+                       return tableSchema.getFieldTypes();
+               }
+
+               @Override
+               public TableSink<Tuple2<Boolean, Row>> configure(
+                               String[] fieldNames, TypeInformation<?>[] 
fieldTypes) {
+                       return null;
+               }
+       }
 }

Reply via email to