This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 104775e1d0fe072da4b00851828af1a1df6cb8dd Author: Timo Walther <[email protected]> AuthorDate: Mon Jul 13 15:17:50 2020 +0200 [FLINK-18586][table-common] Simplify the creation of explicit structured types This closes #12887. --- .../java/org/apache/flink/table/api/DataTypes.java | 53 ++++++++++++++++++++++ .../table/types/extraction/DataTypeExtractor.java | 25 ++-------- .../table/types/extraction/ExtractionUtils.java | 34 +++++++------- .../apache/flink/table/types/DataTypesTest.java | 27 ++++++++++- 4 files changed, 101 insertions(+), 38 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java index 396e7d0..021d616 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.CollectionDataType; @@ -52,6 +53,8 @@ import org.apache.flink.table.types.logical.RawType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; import org.apache.flink.table.types.logical.TimeType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.TinyIntType; @@ -74,6 +77,8 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass; + /** * A {@link DataType} can be used to declare input and/or output types of operations. This class * enumerates all pre-defined data types of the Table & SQL API. @@ -786,6 +791,54 @@ public final class DataTypes { return new AtomicDataType(new TypeInformationRawType<>(typeInformation)); } + /** + * Data type of a user-defined object structured type. Structured types contain zero, one or more + * attributes. Each attribute consists of a name and a type. A type cannot be defined so that one of + * its attribute types (transitively) uses itself. + * + * <p>There are two kinds of structured types. Types that are stored in a catalog and are identified + * by an {@link ObjectIdentifier} or anonymously defined, unregistered types (usually reflectively + * extracted) that are identified by an implementation {@link Class}. + * + * <p>This method helps in manually constructing anonymous, unregistered types. This is useful in + * cases where the reflective extraction using {@link DataTypes#of(Class)} is not applicable. However, + * {@link DataTypes#of(Class)} is the recommended way of creating inline structured types as it also + * considers {@link DataTypeHint}s. + * + * <p>Structured types are converted to internal data structures by the runtime. The given implementation + * class is only used at the edges of the table ecosystem (e.g. when bridging to a function or connector). + * Serialization and equality ({@code hashCode/equals}) are handled by the runtime based on the logical + * type. An implementation class must offer a default constructor with zero arguments or a full constructor + * that assigns all attributes. + * + * <p>Note: A caller of this method must make sure that the {@link DataType#getConversionClass()} of the + * given fields matches with the attributes of the given implementation class, otherwise an exception + * might be thrown during runtime. + * + * @see DataTypes#of(Class) + * @see StructuredType + */ + public static <T> DataType STRUCTURED(Class<T> implementationClass, Field... fields) { + // some basic validation of the class to prevent common mistakes + validateStructuredClass(implementationClass); + + final StructuredType.Builder builder = StructuredType.newBuilder(implementationClass); + final List<StructuredAttribute> attributes = Stream.of(fields) + .map(f -> + new StructuredAttribute( + f.getName(), + f.getDataType().getLogicalType(), + f.getDescription().orElse(null))) + .collect(Collectors.toList()); + builder.attributes(attributes); + builder.setFinal(true); + builder.setInstantiable(true); + final List<DataType> fieldDataTypes = Stream.of(fields) + .map(DataTypes.Field::getDataType) + .collect(Collectors.toList()); + return new FieldsDataType(builder.build(), implementationClass, fieldDataTypes); + } + // -------------------------------------------------------------------------------------------- // Helper functions // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java index 3b20183..493b5d1 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java @@ -23,10 +23,7 @@ import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.FieldsDataType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.StructuredType; -import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; import org.apache.flink.table.types.utils.ClassDataTypeConverter; import org.apache.flink.types.Row; @@ -515,20 +512,11 @@ public final class DataTypeExtractor { type, fields); - final List<StructuredAttribute> attributes = createStructuredTypeAttributes( + final DataTypes.Field[] attributes = createStructuredTypeAttributes( constructor, fieldDataTypes); - final StructuredType.Builder builder = StructuredType.newBuilder(clazz); - builder.attributes(attributes); - builder.setFinal(true); // anonymous structured types should not allow inheritance - builder.setInstantiable(true); - return new FieldsDataType( - builder.build(), - clazz, - attributes.stream() - .map(a -> fieldDataTypes.get(a.getName())) - .collect(Collectors.toList())); + return DataTypes.STRUCTURED(clazz, attributes); } private Map<String, DataType> extractStructuredTypeFields( @@ -560,7 +548,7 @@ public final class DataTypeExtractor { return fieldDataTypes; } - private List<StructuredAttribute> createStructuredTypeAttributes( + private DataTypes.Field[] createStructuredTypeAttributes( ExtractionUtils.AssigningConstructor constructor, Map<String, DataType> fieldDataTypes) { return Optional.ofNullable(constructor) @@ -572,11 +560,8 @@ public final class DataTypeExtractor { // field order is sorted return fieldDataTypes.keySet().stream().sorted(); }) - .map(name -> { - final LogicalType logicalType = fieldDataTypes.get(name).getLogicalType(); - return new StructuredAttribute(name, logicalType); - }) - .collect(Collectors.toList()); + .map(name -> DataTypes.FIELD(name, fieldDataTypes.get(name))) + .toArray(DataTypes.Field[]::new); } /** diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java index e814fb6..2ed11ff 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java @@ -146,6 +146,23 @@ public final class ExtractionUtils { } /** + * Validates the characteristics of a class for a {@link StructuredType} such as accessibility. + */ + public static void validateStructuredClass(Class<?> clazz) { + final int m = clazz.getModifiers(); + if (Modifier.isAbstract(m)) { + throw extractionError("Class '%s' must not be abstract.", clazz.getName()); + } + if (!Modifier.isPublic(m)) { + throw extractionError("Class '%s' is not public.", clazz.getName()); + } + if (clazz.getEnclosingClass() != null && + (clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) { + throw extractionError("Class '%s' is a not a static, globally accessible class.", clazz.getName()); + } + } + + /** * Returns the field of a structured type. The logic is as broad as possible to support * both Java and Scala in different flavors. */ @@ -434,23 +451,6 @@ public final class ExtractionUtils { } /** - * Validates the characteristics of a class for a {@link StructuredType} such as accessibility. - */ - static void validateStructuredClass(Class<?> clazz) { - final int m = clazz.getModifiers(); - if (Modifier.isAbstract(m)) { - throw extractionError("Class '%s' must not be abstract.", clazz.getName()); - } - if (!Modifier.isPublic(m)) { - throw extractionError("Class '%s' is not public.", clazz.getName()); - } - if (clazz.getEnclosingClass() != null && - (clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) { - throw extractionError("Class '%s' is a not a static, globally accessible class.", clazz.getName()); - } - } - - /** * Validates if a given type is not already contained in the type hierarchy of a structured type. * * <p>Otherwise this would lead to infinite data type extraction cycles. diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java index 072534b..6552b80 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java @@ -357,7 +357,15 @@ public class DataTypesTest { .forUnresolvedDataType(RAW(Object.class)) .expectUnresolvedString("[RAW('java.lang.Object', '?')]") .lookupReturns(DataTypes.RAW(new GenericTypeInfo<>(Object.class))) - .expectResolvedDataType(DataTypes.RAW(new GenericTypeInfo<>(Object.class))) + .expectResolvedDataType(DataTypes.RAW(new GenericTypeInfo<>(Object.class))), + + TestSpec + .forUnresolvedDataType(DataTypes.of(SimplePojo.class)) + .expectResolvedDataType( + DataTypes.STRUCTURED( + SimplePojo.class, + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(int.class)))) ); } @@ -475,4 +483,21 @@ public class DataTypesTest { return abstractDataType.toString(); } } + + // -------------------------------------------------------------------------------------------- + // Helper classes + // -------------------------------------------------------------------------------------------- + + /** + * Simple POJO for testing. + */ + public static class SimplePojo { + public final String name; + public final int count; + + public SimplePojo(String name, int count) { + this.name = name; + this.count = count; + } + } }
