This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e1b1fc417d4a7aa4aba50875084f5cf84cc58a22 Author: Jark Wu <j...@apache.org> AuthorDate: Tue Dec 10 23:43:03 2019 +0800 [FLINK-15124][table] Fix types with precision defined in DDL can't be executed --- .../flink/table/sinks/CsvTableSinkFactoryBase.java | 27 ++- .../apache/flink/table/sources/CsvTableSource.java | 93 +++++++++-- .../table/sources/CsvTableSourceFactoryBase.java | 4 +- .../table/planner/codegen/SinkCodeGenerator.scala | 6 +- .../physical/batch/BatchExecTableSourceScan.scala | 15 +- .../stream/StreamExecTableSourceScan.scala | 15 +- .../flink/table/planner/sinks/TableSinkUtils.scala | 5 +- .../table/planner/sources/TableSourceUtil.scala | 133 ++++++++++----- .../table/planner/catalog/CatalogTableITCase.scala | 92 ++++++++++- .../table/validation/TableSinkValidationTest.scala | 8 +- .../runtime/types/DataTypePrecisionFixer.java | 170 +++++++++++++++++++ .../table/runtime/types/PlannerTypeUtils.java | 9 +- .../runtime/types/DataTypePrecisionFixerTest.java | 183 +++++++++++++++++++++ .../runtime/types/LogicalTypeAssignableTest.java | 2 +- 14 files changed, 682 insertions(+), 80 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java index ce4958c..f530fec 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java @@ -19,6 +19,7 @@ package org.apache.flink.table.sinks; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.descriptors.DescriptorProperties; @@ -27,8 +28,13 @@ import org.apache.flink.table.descriptors.FormatDescriptorValidator; import org.apache.flink.table.descriptors.OldCsvValidator; import org.apache.flink.table.descriptors.SchemaValidator; import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.table.types.utils.TypeConversions; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -75,6 +81,8 @@ public abstract class CsvTableSinkFactoryBase implements TableFactory { properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE); properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE); properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME); + // schema watermark + properties.add(SCHEMA + "." + DescriptorProperties.WATERMARK + ".*"); return properties; } @@ -108,7 +116,24 @@ public abstract class CsvTableSinkFactoryBase implements TableFactory { CsvTableSink csvTableSink = new CsvTableSink(path, fieldDelimiter); - return (CsvTableSink) csvTableSink.configure(tableSchema.getFieldNames(), tableSchema.getFieldTypes()); + // bridge to java.sql.Timestamp/Time/Date + TypeInformation<?>[] typeInfos = Arrays.stream(tableSchema.getFieldDataTypes()) + .map(dt -> { + switch (dt.getLogicalType().getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + return dt.bridgedTo(Timestamp.class); + case TIME_WITHOUT_TIME_ZONE: + return dt.bridgedTo(Time.class); + case DATE: + return dt.bridgedTo(Date.class); + default: + return dt; + } + }) + .map(TypeConversions::fromDataTypeToLegacyInfo) + .toArray(TypeInformation[]::new); + + return (CsvTableSink) csvTableSink.configure(tableSchema.getFieldNames(), typeInfos); } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java index becf4de..ec12b20 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java @@ -28,13 +28,19 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import java.io.Serializable; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -44,6 +50,8 @@ import java.util.Map; import java.util.Objects; import java.util.stream.IntStream; +import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; + /** * A {@link StreamTableSource} and {@link BatchTableSource} for simple CSV files with a * (logically) unlimited number of fields. @@ -129,7 +137,7 @@ public class CsvTableSource String ignoreComments, boolean lenient) { this(new CsvInputFormatConfig( - path, fieldNames, fieldTypes, selectedFields, + path, fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes), selectedFields, fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient, false)); } @@ -155,13 +163,23 @@ public class CsvTableSource } @Override - public TypeInformation<Row> getReturnType() { - return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames()); + public DataType getProducedDataType() { + return TableSchema.builder() + .fields(config.getSelectedFieldNames(), config.getSelectedFieldDataTypes()) + .build() + .toRowDataType(); + } + + @SuppressWarnings("unchecked") + private TypeInformation<Row> getProducedTypeInformation() { + return (TypeInformation<Row>) fromDataTypeToLegacyInfo(getProducedDataType()); } @Override public TableSchema getTableSchema() { - return new TableSchema(config.fieldNames, config.fieldTypes); + return TableSchema.builder() + .fields(config.fieldNames, config.fieldTypes) + .build(); } @Override @@ -179,12 +197,16 @@ public class CsvTableSource @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { - return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource()); + return execEnv + .createInput(config.createInputFormat(), getProducedTypeInformation()) + .name(explainSource()); } @Override public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { - return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource()); + return execEnv + .createInput(config.createInputFormat(), getProducedTypeInformation()) + .name(explainSource()); } @Override @@ -229,7 +251,7 @@ public class CsvTableSource * A builder for creating CsvTableSource instances. */ public static class Builder { - private LinkedHashMap<String, TypeInformation<?>> schema = new LinkedHashMap<>(); + private LinkedHashMap<String, DataType> schema = new LinkedHashMap<>(); private Character quoteCharacter; private String path; private String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER; @@ -270,22 +292,55 @@ public class CsvTableSource } /** - * Adds a field with the field name and the type information. Required. This method can be + * Adds a field with the field name and the data type. Required. This method can be * called multiple times. The call order of this method defines also the order of the fields * in a row. * * @param fieldName the field name - * @param fieldType the type information of the field + * @param fieldType the data type of the field */ - public Builder field(String fieldName, TypeInformation<?> fieldType) { + public Builder field(String fieldName, DataType fieldType) { if (schema.containsKey(fieldName)) { throw new IllegalArgumentException("Duplicate field name " + fieldName); } - schema.put(fieldName, fieldType); + // CSV only support java.sql.Timestamp/Date/Time + DataType type; + switch (fieldType.getLogicalType().getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + type = fieldType.bridgedTo(Timestamp.class); + break; + case TIME_WITHOUT_TIME_ZONE: + type = fieldType.bridgedTo(Time.class); + break; + case DATE: + type = fieldType.bridgedTo(Date.class); + break; + default: + type = fieldType; + } + schema.put(fieldName, type); return this; } /** + * Adds a field with the field name and the type information. Required. This method can be + * called multiple times. The call order of this method defines also the order of the fields + * in a row. + * + * @param fieldName the field name + * @param fieldType the type information of the field + * @deprecated This method will be removed in future versions as it uses the old type system. It + * is recommended to use {@link #field(String, DataType)} instead which uses the new type + * system based on {@link DataTypes}. Please make sure to use either the old or the new + * type system consistently to avoid unintended behavior. See the website documentation + * for more information. + */ + @Deprecated + public Builder field(String fieldName, TypeInformation<?> fieldType) { + return field(fieldName, TypeConversions.fromLegacyInfoToDataType(fieldType)); + } + + /** * Sets a quote character for String values, null by default. * * @param quote the quote character @@ -345,7 +400,7 @@ public class CsvTableSource return new CsvTableSource(new CsvInputFormatConfig( path, schema.keySet().toArray(new String[0]), - schema.values().toArray(new TypeInformation<?>[0]), + schema.values().toArray(new DataType[0]), IntStream.range(0, schema.values().size()).toArray(), fieldDelim, lineDelim, @@ -459,7 +514,7 @@ public class CsvTableSource private final String path; private final String[] fieldNames; - private final TypeInformation<?>[] fieldTypes; + private final DataType[] fieldTypes; private final int[] selectedFields; private final String fieldDelim; @@ -473,7 +528,7 @@ public class CsvTableSource CsvInputFormatConfig( String path, String[] fieldNames, - TypeInformation<?>[] fieldTypes, + DataType[] fieldTypes, int[] selectedFields, String fieldDelim, String lineDelim, @@ -504,14 +559,20 @@ public class CsvTableSource return selectedFieldNames; } - TypeInformation<?>[] getSelectedFieldTypes() { - TypeInformation<?>[] selectedFieldTypes = new TypeInformation<?>[selectedFields.length]; + DataType[] getSelectedFieldDataTypes() { + DataType[] selectedFieldTypes = new DataType[selectedFields.length]; for (int i = 0; i < selectedFields.length; i++) { selectedFieldTypes[i] = fieldTypes[selectedFields[i]]; } return selectedFieldTypes; } + TypeInformation<?>[] getSelectedFieldTypes() { + return Arrays.stream(getSelectedFieldDataTypes()) + .map(TypeConversions::fromDataTypeToLegacyInfo) + .toArray(TypeInformation[]::new); + } + RowCsvInputFormat createInputFormat() { RowCsvInputFormat inputFormat = new RowCsvInputFormat( new Path(path), diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java index 386c489..818f9d2 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java @@ -85,6 +85,8 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory { properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE); properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE); properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME); + // schema watermark + properties.add(SCHEMA + "." + DescriptorProperties.WATERMARK + ".*"); return properties; } @@ -122,7 +124,7 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory { params.getOptionalString(FORMAT_LINE_DELIMITER).ifPresent(csvTableSourceBuilder::lineDelimiter); for (int i = 0; i < tableSchema.getFieldCount(); ++i) { - csvTableSourceBuilder.field(tableSchema.getFieldNames()[i], tableSchema.getFieldTypes()[i]); + csvTableSourceBuilder.field(tableSchema.getFieldNames()[i], tableSchema.getFieldDataTypes()[i]); } params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER).ifPresent(csvTableSourceBuilder::quoteCharacter); params.getOptionalString(FORMAT_COMMENT_PREFIX).ifPresent(csvTableSourceBuilder::commentPrefix); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala index 21ea4f6..9f1a810 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala @@ -31,6 +31,7 @@ import org.apache.flink.table.dataformat.{BaseRow, GenericRow} import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternal import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory +import org.apache.flink.table.runtime.types.PlannerTypeUtils import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo import org.apache.flink.table.sinks.TableSink @@ -223,7 +224,10 @@ object SinkCodeGenerator { case (fieldTypeInfo, i) => val requestedTypeInfo = tt.getTypeAt(i) validateFieldType(requestedTypeInfo) - if (!areTypesCompatible( + // it's safe to be only assignable, because the conversion from internal type (Decimal) + // to external type (BigDecimal) doesn't loose precision, the internal type already + // matches to the expected type defined in DDL. + if (!PlannerTypeUtils.isAssignable( fromTypeInfoToLogicalType(fieldTypeInfo), fromTypeInfoToLogicalType(requestedTypeInfo)) && !requestedTypeInfo.isInstanceOf[GenericTypeInfo[Object]]) { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala index 1793616..61080dc 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch + import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.dag.Transformation @@ -27,6 +28,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction import org.apache.flink.table.api.TableException import org.apache.flink.table.dataformat.BaseRow +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.CodeGeneratorContext import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode} @@ -90,9 +92,10 @@ class BatchExecTableSourceScan( val config = planner.getTableConfig val inputTransform = getSourceTransformation(planner.getExecEnv) + val rowType = FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType) val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, - tableSourceTable.getRowType, + rowType, tableSourceTable.isStreamingMode) val inputDataType = inputTransform.getOutputType @@ -114,11 +117,17 @@ class BatchExecTableSourceScan( planner.getRelBuilder ) if (needInternalConversion) { + // the produced type may not carry the correct precision user defined in DDL, because + // it may be converted from legacy type. Fix precision using logical schema from DDL. + // code generation requires the correct precision of input fields. + val fixedProducedDataType = TableSourceUtil.fixPrecisionForProducedDataType( + tableSource, + rowType) ScanUtil.convertToInternalRow( CodeGeneratorContext(config), inputTransform.asInstanceOf[Transformation[Any]], fieldIndexes, - producedDataType, + fixedProducedDataType, getRowType, getTable.getQualifiedName, config, @@ -132,7 +141,7 @@ class BatchExecTableSourceScan( def needInternalConversion: Boolean = { val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, - tableSourceTable.getRowType, + FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType), tableSourceTable.isStreamingMode) ScanUtil.hasTimeAttributeField(fieldIndexes) || ScanUtil.needsConversion(tableSource.getProducedDataType) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala index 67c46f0..dbf5bfd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.{DataTypes, TableException} import org.apache.flink.table.dataformat.DataFormatConverters.DataFormatConverter import org.apache.flink.table.dataformat.{BaseRow, DataFormatConverters} +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.CodeGeneratorContext import org.apache.flink.table.planner.codegen.OperatorCodeGenerator._ import org.apache.flink.table.planner.delegation.StreamPlanner @@ -43,6 +44,7 @@ import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, P import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, StreamTableSource} import org.apache.flink.table.types.{DataType, FieldsDataType} import org.apache.flink.types.Row + import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.metadata.RelMetadataQuery @@ -100,9 +102,10 @@ class StreamExecTableSourceScan( val config = planner.getTableConfig val inputTransform = getSourceTransformation(planner.getExecEnv) + val rowType = FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType) val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, - tableSourceTable.getRowType, + rowType, tableSourceTable.isStreamingMode) val inputDataType = inputTransform.getOutputType @@ -134,11 +137,17 @@ class StreamExecTableSourceScan( } val ctx = CodeGeneratorContext(config).setOperatorBaseClass( classOf[AbstractProcessStreamOperator[BaseRow]]) + // the produced type may not carry the correct precision user defined in DDL, because + // it may be converted from legacy type. Fix precision using logical schema from DDL. + // Code generation requires the correct precision of input fields. + val fixedProducedDataType = TableSourceUtil.fixPrecisionForProducedDataType( + tableSource, + rowType) val conversionTransform = ScanUtil.convertToInternalRow( ctx, inputTransform.asInstanceOf[Transformation[Any]], fieldIndexes, - producedDataType, + fixedProducedDataType, getRowType, getTable.getQualifiedName, config, @@ -182,7 +191,7 @@ class StreamExecTableSourceScan( private def needInternalConversion: Boolean = { val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, - tableSourceTable.getRowType, + FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType), tableSourceTable.isStreamingMode) ScanUtil.hasTimeAttributeField(fieldIndexes) || ScanUtil.needsConversion(tableSource.getProducedDataType) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala index 8d8a578..7ea93a7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala @@ -56,7 +56,10 @@ object TableSinkUtils { if (srcLogicalTypes.length != sinkLogicalTypes.length || srcLogicalTypes.zip(sinkLogicalTypes).exists { case (srcType, sinkType) => - !PlannerTypeUtils.isInteroperable(srcType, sinkType) + // it's safe to be only assignable, because the conversion from internal type (Decimal) + // to external type (BigDecimal) doesn't loose precision, the internal type already + // matches to the expected type defined in DDL. + !PlannerTypeUtils.isAssignable(srcType, sinkType) }) { val srcFieldNames = query.getTableSchema.getFieldNames diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala index 49e96ea..89ef980 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala @@ -18,23 +18,20 @@ package org.apache.flink.table.planner.sources -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException, WatermarkSpec} import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, valueLiteral} import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, ResolvedFieldReference} import org.apache.flink.table.functions.BuiltInFunctionDefinitions import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.expressions.converter.ExpressionConverter -import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter import org.apache.flink.table.runtime.types.PlannerTypeUtils.isAssignable -import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo -import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType +import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType +import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo +import org.apache.flink.table.runtime.types.DataTypePrecisionFixer import org.apache.flink.table.sources.{DefinedFieldMapping, DefinedProctimeAttribute, DefinedRowtimeAttributes, RowtimeAttributeDescriptor, TableSource} import org.apache.flink.table.types.DataType -import org.apache.flink.table.types.logical.{LogicalType, TimestampKind, TimestampType, TinyIntType} +import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot, RowType, TimestampKind, TimestampType, TinyIntType} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo - import com.google.common.collect.ImmutableList import org.apache.calcite.plan.RelOptCluster import org.apache.calcite.rel.RelNode @@ -42,10 +39,12 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.logical.LogicalValues import org.apache.calcite.rex.{RexLiteral, RexNode} import org.apache.calcite.tools.RelBuilder +import org.apache.flink.table.types.logical.RowType.RowField import java.sql.Timestamp import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** Util class for [[TableSource]]. */ object TableSourceUtil { @@ -63,7 +62,7 @@ object TableSourceUtil { */ def computeIndexMapping( tableSource: TableSource[_], - rowType: RelDataType, + rowType: RowType, isStreamTable: Boolean): Array[Int] = { // get rowtime and proctime attributes @@ -72,8 +71,8 @@ object TableSourceUtil { // compute mapping of selected fields and time attributes val names = rowType.getFieldNames.toArray val fieldTypes = rowType - .getFieldList - .map(f => FlinkTypeFactory.toLogicalType(f.getType)) + .getFields + .map(_.getType) .toArray val mapping: Array[Int] = fieldTypes.zip(names).map { case (_: TimestampType, name: String) @@ -100,18 +99,20 @@ object TableSourceUtil { throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + s"Rowtime attributes must be of TimestampType.") } - val (physicalName, idx, tpe) = resolveInputField(name, tableSource) + val (physicalName, idx, logicalType) = resolveInputField(name, tableSource) // validate that mapped fields are are same type - if (!isAssignable(fromTypeInfoToLogicalType(tpe), t)) { + if (!isAssignable(logicalType, t)) { throw new ValidationException(s"Type $t of table field '$name' does not " + - s"match with type $tpe of the field '$physicalName' of the TableSource return type.") + s"match with type $logicalType of the field '$physicalName' of the " + + "TableSource return type.") } idx } - val inputType = fromDataTypeToTypeInfo(tableSource.getProducedDataType) + + val inputType = fromDataTypeToLogicalType(tableSource.getProducedDataType) // ensure that only one field is mapped to an atomic type - if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 1) { + if (!(inputType.getTypeRoot == LogicalTypeRoot.ROW) && mapping.count(_ >= 0) > 1) { throw new ValidationException( s"More than one table field matched to atomic input type $inputType.") } @@ -119,6 +120,62 @@ object TableSourceUtil { mapping } + + /** + * Fixes the precision of [[TableSource#getProducedDataType()]] with the given logical schema + * type. The precision of producedDataType may lose, because the data type may comes from + * legacy type (e.g. Types.BIG_DEC). However, the precision is important to convert output of + * source to internal row. + * + * @param tableSource the table source + * @param logicalSchema the logical schema from DDL which carries the correct precisions + * @return the produced data type with correct precisions. + */ + def fixPrecisionForProducedDataType( + tableSource: TableSource[_], + logicalSchema: RowType): DataType = { + + // remove proctime field from logical schema, because proctime is not in produced data type + val schemaWithoutProctime = getProctimeAttribute(tableSource) match { + case Some(proctime) => + val fields = logicalSchema + .getFields + .filter(f => !f.getName.equals(proctime)) + .asJava + new RowType(logicalSchema.isNullable, fields) + + case None => logicalSchema + } + + // get the corresponding logical type according to the layout of source data type + val sourceLogicalType = fromDataTypeToLogicalType(tableSource.getProducedDataType) + def mapping(physicalName: String): String = tableSource match { + case ts: DefinedFieldMapping if ts.getFieldMapping != null => + // revert key and value, mapping from physical field to logical field + val map = ts.getFieldMapping.toMap.map(_.swap) + map(physicalName) + case _ => + physicalName + } + val correspondingLogicalType = sourceLogicalType match { + case outType: RowType => + val fieldsDataType = schemaWithoutProctime.getFields.map(f => (f.getName, f.getType)).toMap + val fields = outType.getFieldNames.map(n => + new RowField(n, fieldsDataType(mapping(n)))).asJava + new RowType(schemaWithoutProctime.isNullable, fields) + + case _ => + // atomic output type + // get the first type of logical schema list, there must be only one type in the list + schemaWithoutProctime.getFields.get(0).getType + } + + // fixing the precision + tableSource + .getProducedDataType + .accept(new DataTypePrecisionFixer(correspondingLogicalType)) + } + /** * Returns schema of the selected fields of the given [[TableSource]]. * @@ -139,7 +196,7 @@ object TableSourceUtil { val fieldNames = fieldNameArray var fieldTypes = fieldDataTypeArray - .map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType) + .map(fromDataTypeToLogicalType) if (streaming) { // adjust the type of time attributes for streaming tables @@ -184,7 +241,7 @@ object TableSourceUtil { val fieldNames = fieldNameArray var fieldTypes = fieldDataTypeArray - .map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType) + .map(fromDataTypeToLogicalType) // patch rowtime field according to WatermarkSpec fieldTypes = if (streaming) { @@ -236,7 +293,7 @@ object TableSourceUtil { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get, streaming) } else { - val fieldTypes = fieldDataTypes.map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType) + val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType) typeFactory.buildRelNodeRowType(fieldNames, fieldTypes) } } @@ -296,11 +353,10 @@ object TableSourceUtil { * Creates a RelNode with a schema that corresponds on the given fields * Fields for which no information is available, will have default values. */ - def createSchemaRelNode(fields: Array[(String, Int, TypeInformation[_])]): RelNode = { + def createSchemaRelNode(fields: Array[(String, Int, LogicalType)]): RelNode = { val maxIdx = fields.map(_._2).max val idxMap: Map[Int, (String, LogicalType)] = Map( - fields.map(f => f._2 ->(f._1, - fromTypeInfoToLogicalType(f._3))): _*) + fields.map(f => f._2 ->(f._1, f._3)): _*) val (physicalFields, physicalTypes) = (0 to maxIdx) .map(i => idxMap.getOrElse(i, ("", new TinyIntType()))).unzip val physicalSchema: RelDataType = typeFactory.buildRelNodeRowType( @@ -322,7 +378,11 @@ object TableSourceUtil { // push an empty values node with the physical schema on the relbuilder relBuilder.push(createSchemaRelNode(resolvedFields)) // get extraction expression - resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2)) + resolvedFields.map( + f => new ResolvedFieldReference( + f._1, + fromLogicalTypeToTypeInfo(f._3), + f._2)) } else { new Array[ResolvedFieldReference](0) } @@ -371,30 +431,30 @@ object TableSourceUtil { * * @param fieldName The logical field to look up. * @param tableSource The table source in which to look for the field. - * @return The name, index, and type information of the physical field. + * @return The name, index, and logical type of the physical field. */ private def resolveInputField( fieldName: String, - tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = { + tableSource: TableSource[_]): (String, Int, LogicalType) = { - val returnType = fromDataTypeToTypeInfo(tableSource.getProducedDataType) + val returnType = fromDataTypeToLogicalType(tableSource.getProducedDataType) /** Look up a field by name in a type information */ - def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = { - returnType match { + def lookupField(fieldName: String, failMsg: String): (String, Int, LogicalType) = { - case c: CompositeType[_] => + returnType match { + case rt: RowType => // get and check field index - val idx = c.getFieldIndex(fieldName) + val idx = rt.getFieldIndex(fieldName) if (idx < 0) { throw new ValidationException(failMsg) } - // return field name, index, and field type - (fieldName, idx, c.getTypeAt(idx)) - case t: TypeInformation[_] => + // return field name, index, and field type + (fieldName, idx, rt.getTypeAt(idx)) + case _ => // no composite type, we return the full atomic type as field - (fieldName, 0, t) + (fieldName, 0, returnType) } } @@ -423,17 +483,16 @@ object TableSourceUtil { } /** - * Identifies the physical fields in the return type - * [[org.apache.flink.api.common.typeinfo.TypeInformation]] of a [[TableSource]] + * Identifies the physical fields in the return type of a [[TableSource]] * for a list of field names of the [[TableSource]]'s [[org.apache.flink.table.api.TableSchema]]. * * @param fieldNames The field names to look up. * @param tableSource The table source in which to look for the field. - * @return The name, index, and type information of the physical field. + * @return The name, index, and logical type of the physical field. */ private def resolveInputFields( fieldNames: Array[String], - tableSource: TableSource[_]): Array[(String, Int, TypeInformation[_])] = { + tableSource: TableSource[_]): Array[(String, Int, LogicalType)] = { fieldNames.map(resolveInputField(_, tableSource)) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index 5b39bcb..050fc76 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -26,19 +26,26 @@ import org.apache.flink.table.planner.expressions.utils.Func0 import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0 import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row +import org.apache.flink.util.FileUtils + import org.junit.Assert.{assertEquals, fail} import org.junit.rules.ExpectedException import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Ignore, Rule, Test} + +import java.io.File import java.util +import java.math.{BigDecimal => JBigDecimal} +import java.net.URI import scala.collection.JavaConversions._ /** Test cases for catalog table. */ @RunWith(classOf[Parameterized]) -class CatalogTableITCase(isStreamingMode: Boolean) { +class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { //~ Instance fields -------------------------------------------------------- private val settings = if (isStreamingMode) { @@ -135,11 +142,11 @@ class CatalogTableITCase(isStreamingMode: Boolean) { @Test def testInsertInto(): Unit = { val sourceData = List( - toRow(1, "1000", 2), - toRow(2, "1", 3), - toRow(3, "2000", 4), - toRow(1, "2", 2), - toRow(2, "3000", 3) + toRow(1, "1000", 2, new JBigDecimal("10.001")), + toRow(2, "1", 3, new JBigDecimal("10.001")), + toRow(3, "2000", 4, new JBigDecimal("10.001")), + toRow(1, "2", 2, new JBigDecimal("10.001")), + toRow(2, "3000", 3, new JBigDecimal("10.001")) ) TestCollectionTableFactory.initData(sourceData) val sourceDDL = @@ -147,7 +154,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) { |create table t1( | a int, | b varchar, - | c int + | c int, + | d DECIMAL(10, 3) |) with ( | 'connector' = 'COLLECTION' |) @@ -157,7 +165,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) { |create table t2( | a int, | b varchar, - | c int + | c int, + | d DECIMAL(10, 3) |) with ( | 'connector' = 'COLLECTION' |) @@ -165,7 +174,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) { val query = """ |insert into t2 - |select t1.a, t1.b, (t1.a + 1) as c from t1 + |select t1.a, t1.b, (t1.a + 1) as c , d from t1 """.stripMargin tableEnv.sqlUpdate(sourceDDL) tableEnv.sqlUpdate(sinkDDL) @@ -175,6 +184,71 @@ class CatalogTableITCase(isStreamingMode: Boolean) { } @Test + def testReadWriteCsvUsingDDL(): Unit = { + val csvRecords = Seq( + "2.02,Euro,2019-12-12 00:00:01.001001", + "1.11,US Dollar,2019-12-12 00:00:02.002001", + "50,Yen,2019-12-12 00:00:04.004001", + "3.1,Euro,2019-12-12 00:00:05.005001", + "5.33,US Dollar,2019-12-12 00:00:06.006001" + ) + val tempFilePath = createTempFile( + "csv-order-test", + csvRecords.mkString("#")) + val sourceDDL = + s""" + |CREATE TABLE T1 ( + | price DECIMAL(10, 2), + | currency STRING, + | ts TIMESTAMP(3), + | WATERMARK FOR ts AS ts + |) WITH ( + | 'connector.type' = 'filesystem', + | 'connector.path' = '$tempFilePath', + | 'format.type' = 'csv', + | 'format.field-delimiter' = ',', + | 'format.line-delimiter' = '#' + |) + """.stripMargin + tableEnv.sqlUpdate(sourceDDL) + + val sinkFilePath = getTempFilePath("csv-order-sink") + val sinkDDL = + s""" + |CREATE TABLE T2 ( + | window_end TIMESTAMP(3), + | max_ts TIMESTAMP(6), + | counter BIGINT, + | total_price DECIMAL(10, 2) + |) with ( + | 'connector.type' = 'filesystem', + | 'connector.path' = '$sinkFilePath', + | 'format.type' = 'csv', + | 'format.field-delimiter' = ',' + |) + """.stripMargin + tableEnv.sqlUpdate(sinkDDL) + + val query = + """ + |INSERT INTO T2 + |SELECT + | TUMBLE_END(ts, INTERVAL '5' SECOND), + | MAX(ts), + | COUNT(*), + | MAX(price) + |FROM T1 + |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND) + """.stripMargin + tableEnv.sqlUpdate(query) + execJob("testJob") + + val expected = + "2019-12-12 00:00:05.0,2019-12-12 00:00:04.004,3,50.00\n" + + "2019-12-12 00:00:10.0,2019-12-12 00:00:06.006,2,5.33\n" + assertEquals(expected, FileUtils.readFileUtf8(new File(new URI(sinkFilePath)))) + } + @Test def testInsertSourceTableExpressionFields(): Unit = { val sourceData = List( toRow(1, "1000"), diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala index 27b0269..21c3ea3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala @@ -91,21 +91,21 @@ class TableSinkValidationTest extends TableTestBase { expectedException.expectMessage( "Field types of query result and registered TableSink `default_catalog`." + "`default_database`.`testSink` do not match.\n" + - "Query result schema: [a: INT, b: BIGINT, c: STRING, d: DECIMAL(10, 8)]\n" + - "TableSink schema: [a: INT, b: BIGINT, c: STRING, d: DECIMAL(10, 7)]") + "Query result schema: [a: INT, b: BIGINT, c: STRING, d: BIGINT]\n" + + "TableSink schema: [a: INT, b: BIGINT, c: STRING, d: INT]") val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val sourceTable = env.fromCollection(TestData.tupleData3).toTable(tEnv, 'a, 'b, 'c) tEnv.registerTable("source", sourceTable) - val resultTable = tEnv.sqlQuery("select a, b, c, cast(b as decimal(10, 8)) as d from source") + val resultTable = tEnv.sqlQuery("select a, b, c, b as d from source") val sinkSchema = TableSchema.builder() .field("a", DataTypes.INT()) .field("b", DataTypes.BIGINT()) .field("c", DataTypes.STRING()) - .field("d", DataTypes.DECIMAL(10, 7)) + .field("d", DataTypes.INT()) .build() val sink = new DataTypeOutputFormatTableSink(sinkSchema) tEnv.registerTableSink("testSink", sink) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixer.java new file mode 100644 index 0000000..c2674a6 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixer.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.DataTypeVisitor; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.ZonedTimestampType; + +import java.util.Map; + +/** + * The data type visitor used to fix the precision for data type with the given logical type + * which carries the correct precisions. The original data type may loses precision because + * of conversion from {@link org.apache.flink.api.common.typeinfo.TypeInformation}. + */ +public final class DataTypePrecisionFixer implements DataTypeVisitor<DataType> { + + private final LogicalType logicalType; + + /** + * Creates a new instance with the given logical type. + * @param logicalType the logical type which carries the correct precisions. + */ + public DataTypePrecisionFixer(LogicalType logicalType) { + this.logicalType = logicalType; + } + + @Override + public DataType visit(AtomicDataType dataType) { + switch (logicalType.getTypeRoot()) { + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return DataTypes + // fix the precision and scale, because precision may lose or not correct. + // precision from DDL is the only source of truth. + // we don't care about nullability for now. + .DECIMAL(decimalType.getPrecision(), decimalType.getScale()) + // only keep the original conversion class + .bridgedTo(dataType.getConversionClass()); + + case TIMESTAMP_WITHOUT_TIME_ZONE : + TimestampType timestampType = (TimestampType) logicalType; + if (timestampType.getKind() == TimestampKind.REGULAR) { + return DataTypes + .TIMESTAMP(timestampType.getPrecision()) + .bridgedTo(dataType.getConversionClass()); + } else { + // keep the original type if it is time attribute type + // because time attribute can only be precision 3 + // and the original type may be BIGINT. + return dataType; + } + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType; + return DataTypes + .TIMESTAMP_WITH_LOCAL_TIME_ZONE(localZonedTimestampType.getPrecision()) + .bridgedTo(dataType.getConversionClass()); + + case TIMESTAMP_WITH_TIME_ZONE: + ZonedTimestampType zonedTimestampType = (ZonedTimestampType) logicalType; + return DataTypes + .TIMESTAMP_WITH_TIME_ZONE(zonedTimestampType.getPrecision()) + .bridgedTo(dataType.getConversionClass()); + + case TIME_WITHOUT_TIME_ZONE: + TimeType timeType = (TimeType) logicalType; + return DataTypes + .TIME(timeType.getPrecision()) + .bridgedTo(dataType.getConversionClass()); + + default: + return dataType; + } + } + + @Override + public DataType visit(CollectionDataType collectionDataType) { + DataType elementType = collectionDataType.getElementDataType(); + switch (logicalType.getTypeRoot()) { + case ARRAY: + ArrayType arrayType = (ArrayType) logicalType; + DataType newArrayElementType = elementType + .accept(new DataTypePrecisionFixer(arrayType.getElementType())); + return DataTypes + .ARRAY(newArrayElementType) + .bridgedTo(collectionDataType.getConversionClass()); + + case MULTISET: + MultisetType multisetType = (MultisetType) logicalType; + DataType newMultisetElementType = elementType + .accept(new DataTypePrecisionFixer(multisetType.getElementType())); + return DataTypes + .MULTISET(newMultisetElementType) + .bridgedTo(collectionDataType.getConversionClass()); + + default: + throw new UnsupportedOperationException("Unsupported logical type : " + logicalType); + } + } + + @Override + public DataType visit(FieldsDataType fieldsDataType) { + Map<String, DataType> fieldDataTypes = fieldsDataType.getFieldDataTypes(); + if (logicalType.getTypeRoot() == LogicalTypeRoot.ROW) { + RowType rowType = (RowType) logicalType; + DataTypes.Field[] fields = rowType.getFields().stream() + .map(f -> { + DataType fieldType = fieldDataTypes.get(f.getName()); + DataType newFieldType = null; + try { + newFieldType = fieldType.accept(new DataTypePrecisionFixer(f.getType())); + } catch (Exception e) { + e.printStackTrace(); + } + return DataTypes.FIELD(f.getName(), newFieldType); + }) + .toArray(DataTypes.Field[]::new); + return DataTypes.ROW(fields).bridgedTo(fieldsDataType.getConversionClass()); + } + throw new UnsupportedOperationException("Unsupported logical type : " + logicalType); + } + + @Override + public DataType visit(KeyValueDataType keyValueDataType) { + DataType keyType = keyValueDataType.getKeyDataType(); + DataType valueType = keyValueDataType.getValueDataType(); + if (logicalType.getTypeRoot() == LogicalTypeRoot.MAP) { + MapType mapType = (MapType) logicalType; + DataType newKeyType = keyType.accept(new DataTypePrecisionFixer(mapType.getKeyType())); + DataType newValueType = valueType.accept(new DataTypePrecisionFixer(mapType.getValueType())); + return DataTypes + .MAP(newKeyType, newValueType) + .bridgedTo(keyValueDataType.getConversionClass()); + } + throw new UnsupportedOperationException("Unsupported logical type : " + logicalType); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java index 77d346b..724828e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java @@ -115,9 +115,7 @@ public class PlannerTypeUtils { * Now in the conversion to the TypeInformation from DataType, type may loose some information * about nullable and precision. So we add this method to do a soft check. * - * <p>The difference of {@link #isInteroperable} is ignore decimal precision. - * - * <p>Now not ignore timestamp precision, because we only support one precision for timestamp type now. + * <p>The difference of {@link #isInteroperable} is ignore precisions. */ public static boolean isAssignable(LogicalType t1, LogicalType t2) { // Soft check for CharType, it is converted to String TypeInformation and loose char information. @@ -134,7 +132,12 @@ public class PlannerTypeUtils { } switch (t1.getTypeRoot()) { + // only support precisions for DECIMAL, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE + // still consider precision for others (e.g. TIME). + // TODO: add other precision types here in the future case DECIMAL: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return true; default: if (t1.getChildren().isEmpty()) { diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java new file mode 100644 index 0000000..0730ed2 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link DataTypePrecisionFixer}. + */ +@RunWith(Parameterized.class) +public class DataTypePrecisionFixerTest { + + @Parameterized.Parameters(name = "{index}: [From: {0}, To: {1}]") + public static List<TestSpec> testData() { + return Arrays.asList( + + TestSpecs + .fix(Types.BIG_DEC) + .logicalType(new DecimalType(10, 5)) + .expect(DataTypes.DECIMAL(10, 5)), + + TestSpecs + .fix(Types.SQL_TIMESTAMP) + .logicalType(new TimestampType(9)) + .expect(DataTypes.TIMESTAMP(9).bridgedTo(Timestamp.class)), + + TestSpecs + .fix(Types.SQL_TIME) + .logicalType(new TimeType(9)) + .expect(DataTypes.TIME(9).bridgedTo(Time.class)), + + TestSpecs + .fix(Types.SQL_DATE) + .logicalType(new DateType()) + .expect(DataTypes.DATE().bridgedTo(Date.class)), + + TestSpecs + .fix(Types.LOCAL_DATE_TIME) + .logicalType(new TimestampType(9)) + .expect(DataTypes.TIMESTAMP(9)), + + TestSpecs + .fix(Types.LOCAL_TIME) + .logicalType(new TimeType(9)) + .expect(DataTypes.TIME(9)), + + TestSpecs + .fix(Types.LOCAL_DATE) + .logicalType(new DateType()) + .expect(DataTypes.DATE()), + + TestSpecs + .fix(Types.INSTANT) + .logicalType(new LocalZonedTimestampType(2)) + .expect(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(2)), + + TestSpecs + .fix(Types.STRING) + .logicalType(new VarCharType(VarCharType.MAX_LENGTH)) + .expect(DataTypes.STRING()), + + // nested + TestSpecs + .fix(Types.ROW_NAMED( + new String[] {"field1", "field2"}, + Types.MAP(Types.BIG_DEC, Types.SQL_TIMESTAMP), + Types.OBJECT_ARRAY(Types.SQL_TIME))) + .logicalType(new RowType( + Arrays.asList( + new RowType.RowField("field1", new MapType( + new DecimalType(20, 2), + new TimestampType(0))), + new RowType.RowField("field2", new ArrayType(new TimeType(8))) + ) + )) + .expect( + DataTypes.ROW( + FIELD("field1", DataTypes.MAP( + DataTypes.DECIMAL(20, 2), + DataTypes.TIMESTAMP(0).bridgedTo(Timestamp.class))), + FIELD("field2", DataTypes.ARRAY( + DataTypes.TIME(8).bridgedTo(Time.class))))) + + ); + } + + @Parameterized.Parameter + public TestSpec testSpec; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testPrecisionFixing() { + DataType dataType = fromLegacyInfoToDataType(testSpec.typeInfo); + DataType newDataType = dataType.accept(new DataTypePrecisionFixer(testSpec.logicalType)); + assertEquals(testSpec.expectedType, newDataType); + } + + // -------------------------------------------------------------------------------------------- + + private static class TestSpec { + private final TypeInformation<?> typeInfo; + private final LogicalType logicalType; + private final DataType expectedType; + + private TestSpec( + TypeInformation<?> typeInfo, + LogicalType logicalType, + DataType expectedType) { + this.typeInfo = checkNotNull(typeInfo); + this.logicalType = checkNotNull(logicalType); + this.expectedType = checkNotNull(expectedType); + } + } + + private static class TestSpecs { + private TypeInformation<?> typeInfo; + private LogicalType logicalType; + + static TestSpecs fix(TypeInformation<?> typeInfo) { + TestSpecs testSpecs = new TestSpecs(); + testSpecs.typeInfo = typeInfo; + return testSpecs; + } + + TestSpecs logicalType(LogicalType logicalType) { + this.logicalType = logicalType; + return this; + } + + TestSpec expect(DataType expectedType) { + return new TestSpec(typeInfo, logicalType, expectedType); + } + } + +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java index 67209f0..cdc6c4c 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java @@ -104,7 +104,7 @@ public class LogicalTypeAssignableTest { {new TimeType(), new TimeType(9), false}, - {new TimestampType(9), new TimestampType(3), false}, + {new TimestampType(9), new TimestampType(3), true}, {new ZonedTimestampType(9), new ZonedTimestampType(3), false},