This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 104775e1d0fe072da4b00851828af1a1df6cb8dd
Author: Timo Walther <[email protected]>
AuthorDate: Mon Jul 13 15:17:50 2020 +0200

    [FLINK-18586][table-common] Simplify the creation of explicit structured 
types
    
    This closes #12887.
---
 .../java/org/apache/flink/table/api/DataTypes.java | 53 ++++++++++++++++++++++
 .../table/types/extraction/DataTypeExtractor.java  | 25 ++--------
 .../table/types/extraction/ExtractionUtils.java    | 34 +++++++-------
 .../apache/flink/table/types/DataTypesTest.java    | 27 ++++++++++-
 4 files changed, 101 insertions(+), 38 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
index 396e7d0..021d616 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.CollectionDataType;
@@ -52,6 +53,8 @@ import org.apache.flink.table.types.logical.RawType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
@@ -74,6 +77,8 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass;
+
 /**
  * A {@link DataType} can be used to declare input and/or output types of 
operations. This class
  * enumerates all pre-defined data types of the Table & SQL API.
@@ -786,6 +791,54 @@ public final class DataTypes {
                return new AtomicDataType(new 
TypeInformationRawType<>(typeInformation));
        }
 
+       /**
+        * Data type of a user-defined object structured type. Structured types 
contain zero, one or more
+        * attributes. Each attribute consists of a name and a type. A type 
cannot be defined so that one of
+        * its attribute types (transitively) uses itself.
+        *
+        * <p>There are two kinds of structured types. Types that are stored in 
a catalog and are identified
+        * by an {@link ObjectIdentifier} or anonymously defined, unregistered 
types (usually reflectively
+        * extracted) that are identified by an implementation {@link Class}.
+        *
+        * <p>This method helps in manually constructing anonymous, 
unregistered types. This is useful in
+        * cases where the reflective extraction using {@link 
DataTypes#of(Class)} is not applicable. However,
+        * {@link DataTypes#of(Class)} is the recommended way of creating 
inline structured types as it also
+        * considers {@link DataTypeHint}s.
+        *
+        * <p>Structured types are converted to internal data structures by the 
runtime. The given implementation
+        * class is only used at the edges of the table ecosystem (e.g. when 
bridging to a function or connector).
+        * Serialization and equality ({@code hashCode/equals}) are handled by 
the runtime based on the logical
+        * type. An implementation class must offer a default constructor with 
zero arguments or a full constructor
+        * that assigns all attributes.
+        *
+        * <p>Note: A caller of this method must make sure that the {@link 
DataType#getConversionClass()} of the
+        * given fields matches with the attributes of the given implementation 
class, otherwise an exception
+        * might be thrown during runtime.
+        *
+        * @see DataTypes#of(Class)
+        * @see StructuredType
+        */
+       public static <T> DataType STRUCTURED(Class<T> implementationClass, 
Field... fields) {
+               // some basic validation of the class to prevent common mistakes
+               validateStructuredClass(implementationClass);
+
+               final StructuredType.Builder builder = 
StructuredType.newBuilder(implementationClass);
+               final List<StructuredAttribute> attributes = Stream.of(fields)
+                       .map(f ->
+                               new StructuredAttribute(
+                                       f.getName(),
+                                       f.getDataType().getLogicalType(),
+                                       f.getDescription().orElse(null)))
+                       .collect(Collectors.toList());
+               builder.attributes(attributes);
+               builder.setFinal(true);
+               builder.setInstantiable(true);
+               final List<DataType> fieldDataTypes = Stream.of(fields)
+                       .map(DataTypes.Field::getDataType)
+                       .collect(Collectors.toList());
+               return new FieldsDataType(builder.build(), implementationClass, 
fieldDataTypes);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        // Helper functions
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
index 3b20183..493b5d1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
@@ -23,10 +23,7 @@ import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.StructuredType;
-import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.utils.ClassDataTypeConverter;
 import org.apache.flink.types.Row;
 
@@ -515,20 +512,11 @@ public final class DataTypeExtractor {
                        type,
                        fields);
 
-               final List<StructuredAttribute> attributes = 
createStructuredTypeAttributes(
+               final DataTypes.Field[] attributes = 
createStructuredTypeAttributes(
                        constructor,
                        fieldDataTypes);
 
-               final StructuredType.Builder builder = 
StructuredType.newBuilder(clazz);
-               builder.attributes(attributes);
-               builder.setFinal(true); // anonymous structured types should 
not allow inheritance
-               builder.setInstantiable(true);
-               return new FieldsDataType(
-                       builder.build(),
-                       clazz,
-                       attributes.stream()
-                               .map(a -> fieldDataTypes.get(a.getName()))
-                               .collect(Collectors.toList()));
+               return DataTypes.STRUCTURED(clazz, attributes);
        }
 
        private Map<String, DataType> extractStructuredTypeFields(
@@ -560,7 +548,7 @@ public final class DataTypeExtractor {
                return fieldDataTypes;
        }
 
-       private List<StructuredAttribute> createStructuredTypeAttributes(
+       private DataTypes.Field[] createStructuredTypeAttributes(
                        ExtractionUtils.AssigningConstructor constructor,
                        Map<String, DataType> fieldDataTypes) {
                return Optional.ofNullable(constructor)
@@ -572,11 +560,8 @@ public final class DataTypeExtractor {
                                // field order is sorted
                                return 
fieldDataTypes.keySet().stream().sorted();
                        })
-                       .map(name -> {
-                               final LogicalType logicalType = 
fieldDataTypes.get(name).getLogicalType();
-                               return new StructuredAttribute(name, 
logicalType);
-                       })
-                       .collect(Collectors.toList());
+                       .map(name -> DataTypes.FIELD(name, 
fieldDataTypes.get(name)))
+                       .toArray(DataTypes.Field[]::new);
        }
 
        /**
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index e814fb6..2ed11ff 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -146,6 +146,23 @@ public final class ExtractionUtils {
        }
 
        /**
+        * Validates the characteristics of a class for a {@link 
StructuredType} such as accessibility.
+        */
+       public static void validateStructuredClass(Class<?> clazz) {
+               final int m = clazz.getModifiers();
+               if (Modifier.isAbstract(m)) {
+                       throw extractionError("Class '%s' must not be 
abstract.", clazz.getName());
+               }
+               if (!Modifier.isPublic(m)) {
+                       throw extractionError("Class '%s' is not public.", 
clazz.getName());
+               }
+               if (clazz.getEnclosingClass() != null &&
+                               (clazz.getDeclaringClass() == null || 
!Modifier.isStatic(m))) {
+                       throw extractionError("Class '%s' is a not a static, 
globally accessible class.", clazz.getName());
+               }
+       }
+
+       /**
         * Returns the field of a structured type. The logic is as broad as 
possible to support
         * both Java and Scala in different flavors.
         */
@@ -434,23 +451,6 @@ public final class ExtractionUtils {
        }
 
        /**
-        * Validates the characteristics of a class for a {@link 
StructuredType} such as accessibility.
-        */
-       static void validateStructuredClass(Class<?> clazz) {
-               final int m = clazz.getModifiers();
-               if (Modifier.isAbstract(m)) {
-                       throw extractionError("Class '%s' must not be 
abstract.", clazz.getName());
-               }
-               if (!Modifier.isPublic(m)) {
-                       throw extractionError("Class '%s' is not public.", 
clazz.getName());
-               }
-               if (clazz.getEnclosingClass() != null &&
-                               (clazz.getDeclaringClass() == null || 
!Modifier.isStatic(m))) {
-                       throw extractionError("Class '%s' is a not a static, 
globally accessible class.", clazz.getName());
-               }
-       }
-
-       /**
         * Validates if a given type is not already contained in the type 
hierarchy of a structured type.
         *
         * <p>Otherwise this would lead to infinite data type extraction cycles.
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index 072534b..6552b80 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -357,7 +357,15 @@ public class DataTypesTest {
                                .forUnresolvedDataType(RAW(Object.class))
                                
.expectUnresolvedString("[RAW('java.lang.Object', '?')]")
                                .lookupReturns(DataTypes.RAW(new 
GenericTypeInfo<>(Object.class)))
-                               .expectResolvedDataType(DataTypes.RAW(new 
GenericTypeInfo<>(Object.class)))
+                               .expectResolvedDataType(DataTypes.RAW(new 
GenericTypeInfo<>(Object.class))),
+
+                       TestSpec
+                               
.forUnresolvedDataType(DataTypes.of(SimplePojo.class))
+                               .expectResolvedDataType(
+                                       DataTypes.STRUCTURED(
+                                               SimplePojo.class,
+                                               DataTypes.FIELD("name", 
DataTypes.STRING()),
+                                               DataTypes.FIELD("count", 
DataTypes.INT().notNull().bridgedTo(int.class))))
                );
        }
 
@@ -475,4 +483,21 @@ public class DataTypesTest {
                        return abstractDataType.toString();
                }
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Helper classes
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Simple POJO for testing.
+        */
+       public static class SimplePojo {
+               public final String name;
+               public final int count;
+
+               public SimplePojo(String name, int count) {
+                       this.name = name;
+                       this.count = count;
+               }
+       }
 }

Reply via email to