This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5aa1d96f0e846f09a883d5a0b414048576e3c001
Author: Timo Walther <[email protected]>
AuthorDate: Mon Jul 13 15:47:52 2020 +0200

    [hotfix][table-common] Update DataTypeUtils.transform for structured types
---
 .../flink/table/types/utils/DataTypeUtils.java     | 88 +++++++++++++++++-----
 .../types/inference/TypeTransformationsTest.java   | 38 ++++++++++
 2 files changed, 109 insertions(+), 17 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 29bb6a7..e1bdaf6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
@@ -39,13 +40,16 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -84,12 +88,17 @@ public final class DataTypeUtils {
        }
 
        /**
-        * Transforms the given data type (can be nested) to a different data 
type using the given
-        * transformations. The given transformations will be called in order.
+        * Transforms the given data type to a different data type using the 
given transformations.
+        *
+        * <p>The transformations will be called in the given order. In case of 
constructed or composite
+        * types, a transformation will be applied transitively to children 
first.
+        *
+        * <p>Both the {@link DataType#getLogicalType()} and {@link 
DataType#getConversionClass()} can be
+        * transformed.
         *
         * @param typeToTransform data type to be transformed.
         * @param transformations the transformations to transform data type to 
another type.
-        * @return the new data type,
+        * @return the new data type
         */
        public static DataType transform(DataType typeToTransform, 
TypeTransformation... transformations) {
                Preconditions.checkArgument(transformations.length > 0, 
"transformations should not be empty.");
@@ -183,6 +192,12 @@ public final class DataTypeUtils {
                }
        }
 
+       /**
+        * Transforms a {@link DataType}.
+        *
+        * <p>In case of constructed or composite types, a transformation will 
be applied transitively to
+        * children first.
+        */
        private static class DataTypeTransformer implements 
DataTypeVisitor<DataType> {
 
                private final TypeTransformation transformation;
@@ -198,9 +213,9 @@ public final class DataTypeUtils {
 
                @Override
                public DataType visit(CollectionDataType collectionDataType) {
-                       DataType newElementType = 
collectionDataType.getElementDataType().accept(this);
-                       LogicalType logicalType = 
collectionDataType.getLogicalType();
-                       LogicalType newLogicalType;
+                       final DataType newElementType = 
collectionDataType.getElementDataType().accept(this);
+                       final LogicalType logicalType = 
collectionDataType.getLogicalType();
+                       final LogicalType newLogicalType;
                        if (logicalType instanceof ArrayType) {
                                newLogicalType = new ArrayType(
                                        logicalType.isNullable(),
@@ -217,37 +232,60 @@ public final class DataTypeUtils {
 
                @Override
                public DataType visit(FieldsDataType fieldsDataType) {
-                       final List<DataType> newFields = 
fieldsDataType.getChildren().stream()
+                       final List<DataType> newDataTypes = 
fieldsDataType.getChildren().stream()
                                .map(dt -> dt.accept(this))
                                .collect(Collectors.toList());
 
                        final LogicalType logicalType = 
fieldsDataType.getLogicalType();
                        final LogicalType newLogicalType;
                        if (logicalType instanceof RowType) {
-                               final List<RowType.RowField> oldFields = 
((RowType) logicalType).getFields();
-                               final List<RowType.RowField> newRowFields = 
IntStream.range(0, oldFields.size())
+                               final List<RowField> oldFields = ((RowType) 
logicalType).getFields();
+                               final List<RowField> newFields = 
IntStream.range(0, oldFields.size())
                                        .mapToObj(i ->
-                                               new RowType.RowField(
+                                               new RowField(
                                                        
oldFields.get(i).getName(),
-                                                       
newFields.get(i).getLogicalType(),
+                                                       
newDataTypes.get(i).getLogicalType(),
                                                        
oldFields.get(i).getDescription().orElse(null)))
                                        .collect(Collectors.toList());
 
                                newLogicalType = new RowType(
                                        logicalType.isNullable(),
-                                       newRowFields);
+                                       newFields);
+                       } else if (logicalType instanceof StructuredType) {
+                               final StructuredType structuredType = 
(StructuredType) logicalType;
+                               if (structuredType.getSuperType().isPresent()) {
+                                       throw new 
UnsupportedOperationException("Hierarchies of structured types are not 
supported yet.");
+                               }
+                               final List<StructuredAttribute> oldAttributes = 
structuredType.getAttributes();
+                               final List<StructuredAttribute> newAttributes = 
IntStream.range(0, oldAttributes.size())
+                                       .mapToObj(i ->
+                                               new StructuredAttribute(
+                                                       
oldAttributes.get(i).getName(),
+                                                       
newDataTypes.get(i).getLogicalType(),
+                                                       
oldAttributes.get(i).getDescription().orElse(null)))
+                                       .collect(Collectors.toList());
+
+                               final StructuredType.Builder builder = 
createStructuredBuilder(structuredType);
+                               builder.attributes(newAttributes);
+                               
builder.setNullable(structuredType.isNullable());
+                               builder.setFinal(structuredType.isFinal());
+                               
builder.setInstantiable(structuredType.isInstantiable());
+                               
builder.comparision(structuredType.getComparision());
+                               
structuredType.getDescription().ifPresent(builder::description);
+
+                               newLogicalType = builder.build();
                        } else {
                                throw new 
UnsupportedOperationException("Unsupported logical type : " + logicalType);
                        }
-                       return transformation.transform(new 
FieldsDataType(newLogicalType, newFields));
+                       return transformation.transform(new 
FieldsDataType(newLogicalType, newDataTypes));
                }
 
                @Override
                public DataType visit(KeyValueDataType keyValueDataType) {
-                       DataType newKeyType = 
keyValueDataType.getKeyDataType().accept(this);
-                       DataType newValueType = 
keyValueDataType.getValueDataType().accept(this);
-                       LogicalType logicalType = 
keyValueDataType.getLogicalType();
-                       LogicalType newLogicalType;
+                       final DataType newKeyType = 
keyValueDataType.getKeyDataType().accept(this);
+                       final DataType newValueType = 
keyValueDataType.getValueDataType().accept(this);
+                       final LogicalType logicalType = 
keyValueDataType.getLogicalType();
+                       final LogicalType newLogicalType;
                        if (logicalType instanceof MapType) {
                                newLogicalType = new MapType(
                                        logicalType.isNullable(),
@@ -258,6 +296,22 @@ public final class DataTypeUtils {
                        }
                        return transformation.transform(new 
KeyValueDataType(newLogicalType, newKeyType, newValueType));
                }
+
+               // 
----------------------------------------------------------------------------------------
+
+               private StructuredType.Builder 
createStructuredBuilder(StructuredType structuredType) {
+                       final Optional<ObjectIdentifier> identifier = 
structuredType.getObjectIdentifier();
+                       final Optional<Class<?>> implementationClass = 
structuredType.getImplementationClass();
+                       if (identifier.isPresent() && 
implementationClass.isPresent()) {
+                               return 
StructuredType.newBuilder(identifier.get(), implementationClass.get());
+                       } else if (identifier.isPresent()) {
+                               return 
StructuredType.newBuilder(identifier.get());
+                       } else if (implementationClass.isPresent()) {
+                               return 
StructuredType.newBuilder(implementationClass.get());
+                       } else {
+                               throw new IllegalArgumentException("Invalid 
structured type.");
+                       }
+               }
        }
 
        private static TableSchema expandCompositeType(FieldsDataType dataType) 
{
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
index 64c0342..ff33d5e 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -32,6 +34,7 @@ import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 
+import static 
org.apache.flink.table.types.inference.TypeTransformations.TO_INTERNAL_CLASS;
 import static 
org.apache.flink.table.types.inference.TypeTransformations.legacyDecimalToDefaultDecimal;
 import static 
org.apache.flink.table.types.inference.TypeTransformations.legacyRawToTypeInfoRaw;
 import static 
org.apache.flink.table.types.inference.TypeTransformations.timeToSqlTypes;
@@ -44,6 +47,22 @@ import static org.junit.Assert.assertEquals;
 public class TypeTransformationsTest {
 
        @Test
+       public void testToInternal() {
+               DataType dataType = DataTypes.STRUCTURED(
+                       SimplePojo.class,
+                       DataTypes.FIELD("name", DataTypes.STRING()),
+                       DataTypes.FIELD("count", 
DataTypes.INT().notNull().bridgedTo(int.class)));
+
+               DataType expected = DataTypes.STRUCTURED(
+                               SimplePojo.class,
+                               DataTypes.FIELD("name", 
DataTypes.STRING().bridgedTo(StringData.class)),
+                               DataTypes.FIELD("count", 
DataTypes.INT().notNull().bridgedTo(Integer.class)))
+                       .bridgedTo(RowData.class);
+
+               assertEquals(expected, DataTypeUtils.transform(dataType, 
TO_INTERNAL_CLASS));
+       }
+
+       @Test
        public void testTimeToSqlTypes() {
                DataType dataType = DataTypes.ROW(
                        DataTypes.FIELD("a", DataTypes.STRING()),
@@ -130,6 +149,8 @@ public class TypeTransformationsTest {
                assertEquals(expected, DataTypeUtils.transform(dataType, 
toNullable()));
        }
 
+       // 
--------------------------------------------------------------------------------------------
+
        private static DataType createLegacyDecimal() {
                return TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC);
        }
@@ -137,4 +158,21 @@ public class TypeTransformationsTest {
        private static DataType createLegacyRaw() {
                return 
TypeConversions.fromLegacyInfoToDataType(Types.GENERIC(TypeTransformationsTest.class));
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Helper classes
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Simple POJO for testing.
+        */
+       public static class SimplePojo {
+               public final String name;
+               public final int count;
+
+               public SimplePojo(String name, int count) {
+                       this.name = name;
+                       this.count = count;
+               }
+       }
 }

Reply via email to