This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new ff7b4f6 [FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11993) ff7b4f6 is described below commit ff7b4f6f0e073477b7a93dbda8243e7fc8647f50 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue May 5 20:04:20 2020 +0200 [FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11993) Co-authored-by: Terry Wang <zjuwa...@foxmail.com> --- .../apache/flink/table/utils/TypeMappingUtils.java | 115 ++++++++++++++++----- .../flink/table/utils/TypeMappingUtilsTest.java | 68 ++++++++++++ 2 files changed, 159 insertions(+), 24 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java index 8239100..e284787 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java @@ -32,6 +32,8 @@ import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.flink.table.types.utils.DataTypeUtils; @@ -46,7 +48,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; /** * Utility methods for dealing with field types in {@link org.apache.flink.table.sources.TableSource} @@ -157,19 +161,35 @@ public final class TypeMappingUtils { String physicalFieldName, String logicalFieldName, boolean isSource) { - checkIfCompatible( - physicalFieldType, - logicalFieldType, - (cause) -> new ValidationException( + Function<Throwable, ValidationException> exceptionSupplier = (cause) -> + new ValidationException( String.format( "Type %s of table field '%s' does not match with " + - "the physical type %s of the '%s' field of the %s.", + "the physical type %s of the '%s' field of the %s type.", logicalFieldType, logicalFieldName, physicalFieldType, physicalFieldName, - isSource ? "TableSource return type" : "TableSink consumed type"), - cause)); + isSource ? "TableSource return" : "TableSink consumed"), + cause); + try { + final boolean typesCompatible; + if (isSource) { + typesCompatible = checkIfCompatible( + physicalFieldType, + logicalFieldType); + } else { + typesCompatible = checkIfCompatible( + logicalFieldType, + physicalFieldType); + } + + if (!typesCompatible) { + throw exceptionSupplier.apply(null); + } + } catch (Exception e) { + throw exceptionSupplier.apply(e); + } } private static void verifyTimeAttributeType(TableColumn logicalColumn, String rowtimeOrProctime) { @@ -243,38 +263,85 @@ public final class TypeMappingUtils { ); } - private static void checkIfCompatible( - LogicalType physicalFieldType, - LogicalType logicalFieldType, - Function<Throwable, ValidationException> exceptionSupplier) { - if (LogicalTypeChecks.areTypesCompatible(physicalFieldType, logicalFieldType)) { - return; + private static boolean checkIfCompatible( + LogicalType sourceType, + LogicalType targetType) { + if (LogicalTypeChecks.areTypesCompatible(sourceType, targetType)) { + return true; + } + + Boolean targetTypeCompatible = targetType.accept(new LogicalTypeDefaultVisitor<Boolean>() { + + @Override + public Boolean visit(VarCharType targetType) { + if (sourceType.isNullable() && !targetType.isNullable()) { + return false; + } + // CHAR and VARCHAR are very compatible within bounds + if ((hasRoot(sourceType, LogicalTypeRoot.CHAR) || hasRoot(sourceType, LogicalTypeRoot.VARCHAR)) && + getLength(sourceType) <= targetType.getLength()) { + return true; + } + return defaultMethod(targetType); + } + + @Override + public Boolean visit(VarBinaryType targetType) { + if (sourceType.isNullable() && !targetType.isNullable()) { + return false; + } + // BINARY and VARBINARY are very compatible within bounds + if ((hasRoot(sourceType, LogicalTypeRoot.BINARY) || hasRoot(sourceType, LogicalTypeRoot.VARBINARY)) && + getLength(sourceType) <= targetType.getLength()) { + return true; + } + return defaultMethod(targetType); + } + + @Override + protected Boolean defaultMethod(LogicalType logicalType) { + return false; + } + }); + + if (targetTypeCompatible) { + return true; } - physicalFieldType.accept(new LogicalTypeDefaultVisitor<Void>() { + return sourceType.accept(new LogicalTypeDefaultVisitor<Boolean>() { + @Override + public Boolean visit(DecimalType sourceType1) { + //When targetType is a legacy decimal type, pass the check. + if (targetType instanceof LegacyTypeInformationType + && targetType.getTypeRoot() == LogicalTypeRoot.DECIMAL) { + return true; + } + return defaultMethod(sourceType1); + } + @Override - public Void visit(LogicalType other) { + public Boolean visit(LogicalType other) { if (other instanceof LegacyTypeInformationType && other.getTypeRoot() == LogicalTypeRoot.DECIMAL) { - if (!(logicalFieldType instanceof DecimalType)) { - throw exceptionSupplier.apply(null); + if (!(targetType instanceof DecimalType)) { + return false; } - DecimalType logicalDecimalType = (DecimalType) logicalFieldType; + DecimalType logicalDecimalType = (DecimalType) targetType; if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION || - logicalDecimalType.getScale() != 18) { - throw exceptionSupplier.apply(new ValidationException( - "Legacy decimal type can only be mapped to DECIMAL(38, 18).")); + logicalDecimalType.getScale() != 18) { + throw new ValidationException( + "Legacy decimal type can only be mapped to DECIMAL(38, 18)."); } - return null; + return true; } return defaultMethod(other); } @Override - protected Void defaultMethod(LogicalType logicalType) { - throw exceptionSupplier.apply(null); + protected Boolean defaultMethod(LogicalType logicalType) { + return false; } }); } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java index daadd66..ab15581 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java @@ -18,16 +18,23 @@ package org.apache.flink.table.utils; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.DefinedProctimeAttribute; import org.apache.flink.table.sources.DefinedRowtimeAttributes; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; import org.junit.Rule; import org.junit.Test; @@ -331,6 +338,30 @@ public class TypeMappingUtilsTest { ); } + @Test + public void testCheckPhysicalLogicalTypeCompatible() { + TableSchema tableSchema = TableSchema.builder() + .field("a", DataTypes.VARCHAR(2)) + .field("b", DataTypes.DECIMAL(20, 2)) + .build(); + TableSink<Tuple2<Boolean, Row>> tableSink = new TestTableSink(tableSchema); + LegacyTypeInformationType<?> legacyDataType = (LegacyTypeInformationType<?>) tableSink.getConsumedDataType() + .getLogicalType(); + TypeInformation<?> legacyTypeInfo = ((TupleTypeInfo<?>) legacyDataType.getTypeInformation()).getTypeAt(1); + DataType physicalType = TypeConversions.fromLegacyInfoToDataType(legacyTypeInfo); + TableSchema physicSchema = DataTypeUtils.expandCompositeTypeToSchema(physicalType); + DataType[] logicalDataTypes = tableSchema.getFieldDataTypes(); + DataType[] physicalDataTypes = physicSchema.getFieldDataTypes(); + for (int i = 0; i < logicalDataTypes.length; i++) { + TypeMappingUtils.checkPhysicalLogicalTypeCompatible( + physicalDataTypes[i].getLogicalType(), + logicalDataTypes[i].getLogicalType(), + "physicalField", + "logicalField", + false); + } + } + private static class TestTableSource implements TableSource<Object>, DefinedProctimeAttribute, DefinedRowtimeAttributes { @@ -370,4 +401,41 @@ public class TypeMappingUtilsTest { throw new UnsupportedOperationException("Should not be called"); } } + + /** + * Since UpsertStreamTableSink not in flink-table-common module, here we use Tuple2 <Boolean, Row> to + * simulate the behavior of UpsertStreamTableSink. + */ + private static class TestTableSink implements TableSink<Tuple2<Boolean, Row>> { + private final TableSchema tableSchema; + + private TestTableSink(TableSchema tableSchema) { + this.tableSchema = tableSchema; + } + + TypeInformation<Row> getRecordType() { + return tableSchema.toRowType(); + } + + @Override + public TypeInformation<Tuple2<Boolean, Row>> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } + + @Override + public String[] getFieldNames() { + return tableSchema.getFieldNames(); + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return tableSchema.getFieldTypes(); + } + + @Override + public TableSink<Tuple2<Boolean, Row>> configure( + String[] fieldNames, TypeInformation<?>[] fieldTypes) { + return null; + } + } }