This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5aa1d96f0e846f09a883d5a0b414048576e3c001 Author: Timo Walther <[email protected]> AuthorDate: Mon Jul 13 15:47:52 2020 +0200 [hotfix][table-common] Update DataTypeUtils.transform for structured types --- .../flink/table/types/utils/DataTypeUtils.java | 88 +++++++++++++++++----- .../types/inference/TypeTransformationsTest.java | 38 ++++++++++ 2 files changed, 109 insertions(+), 17 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java index 29bb6a7..e1bdaf6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.CollectionDataType; import org.apache.flink.table.types.DataType; @@ -39,13 +40,16 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.flink.util.Preconditions; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -84,12 +88,17 @@ public final class DataTypeUtils { } /** - * Transforms the given data type (can be nested) to a different data type using the given - * transformations. The given transformations will be called in order. + * Transforms the given data type to a different data type using the given transformations. + * + * <p>The transformations will be called in the given order. In case of constructed or composite + * types, a transformation will be applied transitively to children first. + * + * <p>Both the {@link DataType#getLogicalType()} and {@link DataType#getConversionClass()} can be + * transformed. * * @param typeToTransform data type to be transformed. * @param transformations the transformations to transform data type to another type. - * @return the new data type, + * @return the new data type */ public static DataType transform(DataType typeToTransform, TypeTransformation... transformations) { Preconditions.checkArgument(transformations.length > 0, "transformations should not be empty."); @@ -183,6 +192,12 @@ public final class DataTypeUtils { } } + /** + * Transforms a {@link DataType}. + * + * <p>In case of constructed or composite types, a transformation will be applied transitively to + * children first. + */ private static class DataTypeTransformer implements DataTypeVisitor<DataType> { private final TypeTransformation transformation; @@ -198,9 +213,9 @@ public final class DataTypeUtils { @Override public DataType visit(CollectionDataType collectionDataType) { - DataType newElementType = collectionDataType.getElementDataType().accept(this); - LogicalType logicalType = collectionDataType.getLogicalType(); - LogicalType newLogicalType; + final DataType newElementType = collectionDataType.getElementDataType().accept(this); + final LogicalType logicalType = collectionDataType.getLogicalType(); + final LogicalType newLogicalType; if (logicalType instanceof ArrayType) { newLogicalType = new ArrayType( logicalType.isNullable(), @@ -217,37 +232,60 @@ public final class DataTypeUtils { @Override public DataType visit(FieldsDataType fieldsDataType) { - final List<DataType> newFields = fieldsDataType.getChildren().stream() + final List<DataType> newDataTypes = fieldsDataType.getChildren().stream() .map(dt -> dt.accept(this)) .collect(Collectors.toList()); final LogicalType logicalType = fieldsDataType.getLogicalType(); final LogicalType newLogicalType; if (logicalType instanceof RowType) { - final List<RowType.RowField> oldFields = ((RowType) logicalType).getFields(); - final List<RowType.RowField> newRowFields = IntStream.range(0, oldFields.size()) + final List<RowField> oldFields = ((RowType) logicalType).getFields(); + final List<RowField> newFields = IntStream.range(0, oldFields.size()) .mapToObj(i -> - new RowType.RowField( + new RowField( oldFields.get(i).getName(), - newFields.get(i).getLogicalType(), + newDataTypes.get(i).getLogicalType(), oldFields.get(i).getDescription().orElse(null))) .collect(Collectors.toList()); newLogicalType = new RowType( logicalType.isNullable(), - newRowFields); + newFields); + } else if (logicalType instanceof StructuredType) { + final StructuredType structuredType = (StructuredType) logicalType; + if (structuredType.getSuperType().isPresent()) { + throw new UnsupportedOperationException("Hierarchies of structured types are not supported yet."); + } + final List<StructuredAttribute> oldAttributes = structuredType.getAttributes(); + final List<StructuredAttribute> newAttributes = IntStream.range(0, oldAttributes.size()) + .mapToObj(i -> + new StructuredAttribute( + oldAttributes.get(i).getName(), + newDataTypes.get(i).getLogicalType(), + oldAttributes.get(i).getDescription().orElse(null))) + .collect(Collectors.toList()); + + final StructuredType.Builder builder = createStructuredBuilder(structuredType); + builder.attributes(newAttributes); + builder.setNullable(structuredType.isNullable()); + builder.setFinal(structuredType.isFinal()); + builder.setInstantiable(structuredType.isInstantiable()); + builder.comparision(structuredType.getComparision()); + structuredType.getDescription().ifPresent(builder::description); + + newLogicalType = builder.build(); } else { throw new UnsupportedOperationException("Unsupported logical type : " + logicalType); } - return transformation.transform(new FieldsDataType(newLogicalType, newFields)); + return transformation.transform(new FieldsDataType(newLogicalType, newDataTypes)); } @Override public DataType visit(KeyValueDataType keyValueDataType) { - DataType newKeyType = keyValueDataType.getKeyDataType().accept(this); - DataType newValueType = keyValueDataType.getValueDataType().accept(this); - LogicalType logicalType = keyValueDataType.getLogicalType(); - LogicalType newLogicalType; + final DataType newKeyType = keyValueDataType.getKeyDataType().accept(this); + final DataType newValueType = keyValueDataType.getValueDataType().accept(this); + final LogicalType logicalType = keyValueDataType.getLogicalType(); + final LogicalType newLogicalType; if (logicalType instanceof MapType) { newLogicalType = new MapType( logicalType.isNullable(), @@ -258,6 +296,22 @@ public final class DataTypeUtils { } return transformation.transform(new KeyValueDataType(newLogicalType, newKeyType, newValueType)); } + + // ---------------------------------------------------------------------------------------- + + private StructuredType.Builder createStructuredBuilder(StructuredType structuredType) { + final Optional<ObjectIdentifier> identifier = structuredType.getObjectIdentifier(); + final Optional<Class<?>> implementationClass = structuredType.getImplementationClass(); + if (identifier.isPresent() && implementationClass.isPresent()) { + return StructuredType.newBuilder(identifier.get(), implementationClass.get()); + } else if (identifier.isPresent()) { + return StructuredType.newBuilder(identifier.get()); + } else if (implementationClass.isPresent()) { + return StructuredType.newBuilder(implementationClass.get()); + } else { + throw new IllegalArgumentException("Invalid structured type."); + } + } } private static TableSchema expandCompositeType(FieldsDataType dataType) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java index 64c0342..ff33d5e 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.table.types.utils.TypeConversions; @@ -32,6 +34,7 @@ import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import static org.apache.flink.table.types.inference.TypeTransformations.TO_INTERNAL_CLASS; import static org.apache.flink.table.types.inference.TypeTransformations.legacyDecimalToDefaultDecimal; import static org.apache.flink.table.types.inference.TypeTransformations.legacyRawToTypeInfoRaw; import static org.apache.flink.table.types.inference.TypeTransformations.timeToSqlTypes; @@ -44,6 +47,22 @@ import static org.junit.Assert.assertEquals; public class TypeTransformationsTest { @Test + public void testToInternal() { + DataType dataType = DataTypes.STRUCTURED( + SimplePojo.class, + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(int.class))); + + DataType expected = DataTypes.STRUCTURED( + SimplePojo.class, + DataTypes.FIELD("name", DataTypes.STRING().bridgedTo(StringData.class)), + DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(Integer.class))) + .bridgedTo(RowData.class); + + assertEquals(expected, DataTypeUtils.transform(dataType, TO_INTERNAL_CLASS)); + } + + @Test public void testTimeToSqlTypes() { DataType dataType = DataTypes.ROW( DataTypes.FIELD("a", DataTypes.STRING()), @@ -130,6 +149,8 @@ public class TypeTransformationsTest { assertEquals(expected, DataTypeUtils.transform(dataType, toNullable())); } + // -------------------------------------------------------------------------------------------- + private static DataType createLegacyDecimal() { return TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC); } @@ -137,4 +158,21 @@ public class TypeTransformationsTest { private static DataType createLegacyRaw() { return TypeConversions.fromLegacyInfoToDataType(Types.GENERIC(TypeTransformationsTest.class)); } + + // -------------------------------------------------------------------------------------------- + // Helper classes + // -------------------------------------------------------------------------------------------- + + /** + * Simple POJO for testing. + */ + public static class SimplePojo { + public final String name; + public final int count; + + public SimplePojo(String name, int count) { + this.name = name; + this.count = count; + } + } }
