This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fe6b460  [FLINK-12253][table-common] Add user-defined types
fe6b460 is described below

commit fe6b46053fac9c534adee72482112f583925b5fe
Author: Timo Walther <twal...@apache.org>
AuthorDate: Thu May 2 13:30:59 2019 +0200

    [FLINK-12253][table-common] Add user-defined types
---
 .../flink/table/types/logical/DistinctType.java    | 150 +++++++++
 .../table/types/logical/LogicalTypeVisitor.java    |   4 +
 .../flink/table/types/logical/StructuredType.java  | 344 +++++++++++++++++++++
 .../flink/table/types/logical/UserDefinedType.java | 178 +++++++++++
 .../apache/flink/table/types/LogicalTypesTest.java |  90 ++++++
 5 files changed, 766 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
new file mode 100644
index 0000000..80aa433
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Logical type of a user-defined distinct type. A distinct type specifies an 
identifier and is backed
+ * by a source type. A distinct type has the same internal representation as a 
source type, but is
+ * considered to be a separate and incompatible data type for most operations. 
Compared to the SQL
+ * standard, every non-user-defined type can be used as a source type.
+ *
+ * <p>A distinct type can always be cast to its source type and vice versa.
+ *
+ * <p>Distinct types are implicitly final and do not support super types.
+ *
+ * <p>Most other properties are forwarded from the source type. Thus, ordering 
and comparision among
+ * the same distinct types are supported.
+ *
+ * <p>The serialized string representation is the fully qualified name of this 
type which means that
+ * the type must have been registered in a catalog.
+ */
+@PublicEvolving
+public final class DistinctType extends UserDefinedType {
+
+       /**
+        * A builder for a {@link DistinctType}. Intended for future 
extensibility.
+        */
+       public static final class Builder {
+
+               private final TypeIdentifier typeIdentifier;
+
+               private final LogicalType sourceType;
+
+               private @Nullable String description;
+
+               public Builder(TypeIdentifier typeIdentifier, LogicalType 
sourceType) {
+                       this.typeIdentifier = 
Preconditions.checkNotNull(typeIdentifier, "Type identifier must not be null.");
+                       this.sourceType = 
Preconditions.checkNotNull(sourceType, "Source type must not be null.");
+
+                       Preconditions.checkArgument(
+                               
!sourceType.getTypeRoot().getFamilies().contains(LogicalTypeFamily.USER_DEFINED),
+                               "Source type must not be a user-defined type.");
+               }
+
+               public Builder setDescription(String description) {
+                       this.description = 
Preconditions.checkNotNull(description, "Description must not be null");
+                       return this;
+               }
+
+               public DistinctType build() {
+                       return new DistinctType(typeIdentifier, sourceType, 
description);
+               }
+       }
+
+       private final LogicalType sourceType;
+
+       private DistinctType(
+                       TypeIdentifier typeIdentifier,
+                       LogicalType sourceType,
+                       @Nullable String description) {
+               super(
+                       sourceType.isNullable(),
+                       LogicalTypeRoot.DISTINCT_TYPE,
+                       typeIdentifier,
+                       true,
+                       description);
+               this.sourceType = Preconditions.checkNotNull(sourceType, 
"Source type must not be null.");
+       }
+
+       public LogicalType getSourceType() {
+               return sourceType;
+       }
+
+       @Override
+       public LogicalType copy(boolean isNullable) {
+               return new DistinctType(
+                       getTypeIdentifier(),
+                       sourceType.copy(isNullable),
+                       getDescription().orElse(null));
+       }
+
+       @Override
+       public boolean supportsInputConversion(Class<?> clazz) {
+               return sourceType.supportsInputConversion(clazz);
+       }
+
+       @Override
+       public boolean supportsOutputConversion(Class<?> clazz) {
+               return sourceType.supportsOutputConversion(clazz);
+       }
+
+       @Override
+       public Class<?> getDefaultConversion() {
+               return sourceType.getDefaultConversion();
+       }
+
+       @Override
+       public List<LogicalType> getChildren() {
+               return Collections.singletonList(sourceType);
+       }
+
+       @Override
+       public <R> R accept(LogicalTypeVisitor<R> visitor) {
+               return visitor.visit(this);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+               DistinctType that = (DistinctType) o;
+               return sourceType.equals(that.sourceType);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(super.hashCode(), sourceType);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index 2e1d3de..c574f4b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -75,5 +75,9 @@ public interface LogicalTypeVisitor<R> {
 
        R visit(RowType rowType);
 
+       R visit(DistinctType distinctType);
+
+       R visit(StructuredType structuredType);
+
        R visit(LogicalType other);
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
new file mode 100644
index 0000000..f76ad6b
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Logical type of a user-defined object structured type. Structured types 
contain one or more
+ * attributes. Each attribute consists of a name and a type. A type cannot be 
defined so that one of
+ * its attribute types (transitively) uses itself.
+ *
+ * <p>A structured type can declare a super type and allows single inheritance 
for more complex type
+ * hierarchies, similar to JVM-based languages.
+ *
+ * <p>A structured type must be declared {@code final} for preventing further 
inheritance (default
+ * behavior) or {@code not final} for allowing subtypes.
+ *
+ * <p>A structured type must be declared {@code not instantiable} if a more 
specific type is
+ * required or {@code instantiable} if instances can be created from this type 
(default behavior).
+ *
+ * <p>A structured type declares comparision properties of either {@code none} 
(no equality),
+ * {@code equals} (only equality and inequality), or {@code full} (greater, 
equals, less).
+ *
+ * <p>NOTE: Compared to the SQL standard, this class is incomplete. We might 
add new features such
+ * as method declarations in the future. Also ordering is not supported yet.
+ */
+@PublicEvolving
+public final class StructuredType extends UserDefinedType {
+
+       private static final Set<String> INPUT_OUTPUT_CONVERSION = 
conversionSet(
+               Row.class.getName(),
+               "org.apache.flink.table.dataformat.BaseRow");
+
+       private static final Class<?> FALLBACK_CONVERSION = Row.class;
+
+       /**
+        * Defines an attribute of a {@link StructuredType}.
+        */
+       public static final class StructuredAttribute implements Serializable {
+
+               private final String name;
+
+               private final LogicalType type;
+
+               public StructuredAttribute(String name, LogicalType type) {
+                       this.name = Preconditions.checkNotNull(name, "Attribute 
name must not be null.");
+                       this.type = Preconditions.checkNotNull(type, "Attribute 
type must not be null.");
+               }
+
+               public String getName() {
+                       return name;
+               }
+
+               public LogicalType getType() {
+                       return type;
+               }
+
+               public StructuredAttribute copy() {
+                       return new StructuredAttribute(name, type.copy());
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       StructuredAttribute that = (StructuredAttribute) o;
+                       return name.equals(that.name) && type.equals(that.type);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(name, type);
+               }
+       }
+
+       /**
+        * Defines equality properties for scalar evaluation.
+        */
+       public enum StructuredComparision {
+               EQUALS,
+               FULL,
+               NONE
+       }
+
+       /**
+        * A builder for a {@link StructuredType}. Intended for future 
extensibility.
+        */
+       public static class Builder {
+
+               private final TypeIdentifier typeIdentifier;
+
+               private final List<StructuredAttribute> attributes;
+
+               private boolean isNullable = true;
+
+               private boolean isFinal = true;
+
+               private boolean isInstantiable = true;
+
+               private StructuredComparision comparision = 
StructuredComparision.NONE;
+
+               private @Nullable StructuredType superType;
+
+               private @Nullable String description;
+
+               private @Nullable Class<?> implementationClass;
+
+               public Builder(TypeIdentifier typeIdentifier, 
List<StructuredAttribute> attributes) {
+                       this.typeIdentifier = 
Preconditions.checkNotNull(typeIdentifier, "Type identifier must not be null.");
+                       this.attributes = Collections.unmodifiableList(
+                               new ArrayList<>(
+                                       Preconditions.checkNotNull(attributes, 
"Attributes must not be null.")));
+
+                       Preconditions.checkArgument(
+                               attributes.size() > 0,
+                               "Attribute list must not be empty.");
+               }
+
+               public Builder setNullable(boolean isNullable) {
+                       this.isNullable = isNullable;
+                       return this;
+               }
+
+               public Builder setDescription(String description) {
+                       this.description = 
Preconditions.checkNotNull(description, "Description must not be null.");
+                       return this;
+               }
+
+               public Builder setFinal(boolean isFinal) {
+                       this.isFinal = isFinal;
+                       return this;
+               }
+
+               public Builder setInstantiable(boolean isInstantiable) {
+                       this.isInstantiable = isInstantiable;
+                       return this;
+               }
+
+               public Builder setComparision(StructuredComparision 
comparision) {
+                       this.comparision = 
Preconditions.checkNotNull(comparision, "Comparision must not be null.");
+                       return this;
+               }
+
+               public Builder setSuperType(@Nullable StructuredType superType) 
{
+                       this.superType = Preconditions.checkNotNull(superType, 
"Super type must not be null.");
+                       return this;
+               }
+
+               public Builder setImplementationClass(Class<?> 
implementationClass) {
+                       this.implementationClass = 
Preconditions.checkNotNull(implementationClass, "Implementation class must not 
null.");
+                       return this;
+               }
+
+               public StructuredType build() {
+                       return new StructuredType(
+                               isNullable,
+                               typeIdentifier,
+                               attributes,
+                               isFinal,
+                               isInstantiable,
+                               comparision,
+                               superType,
+                               description,
+                               implementationClass);
+               }
+       }
+
+       private final List<StructuredAttribute> attributes;
+
+       private final boolean isInstantiable;
+
+       private final StructuredComparision comparision;
+
+       private final @Nullable StructuredType superType;
+
+       private final @Nullable Class<?> implementationClass;
+
+       private StructuredType(
+                       boolean isNullable,
+                       TypeIdentifier typeIdentifier,
+                       List<StructuredAttribute> attributes,
+                       boolean isFinal,
+                       boolean isInstantiable,
+                       StructuredComparision comparision,
+                       @Nullable StructuredType superType,
+                       @Nullable String description,
+                       @Nullable Class<?> implementationClass) {
+               super(
+                       isNullable,
+                       LogicalTypeRoot.STRUCTURED_TYPE,
+                       typeIdentifier,
+                       isFinal,
+                       description);
+
+               this.attributes = attributes;
+               this.isInstantiable = isInstantiable;
+               this.comparision = comparision;
+               this.superType = superType;
+               this.implementationClass = implementationClass;
+       }
+
+       public List<StructuredAttribute> getAttributes() {
+               return attributes;
+       }
+
+       public boolean isInstantiable() {
+               return isInstantiable;
+       }
+
+       public StructuredComparision getComparision() {
+               return comparision;
+       }
+
+       public Optional<StructuredType> getSuperType() {
+               return Optional.ofNullable(superType);
+       }
+
+       public Optional<Class<?>> getImplementationClass() {
+               return Optional.ofNullable(implementationClass);
+       }
+
+       @Override
+       public LogicalType copy(boolean isNullable) {
+               return new StructuredType(
+                       isNullable,
+                       getTypeIdentifier(),
+                       
attributes.stream().map(StructuredAttribute::copy).collect(Collectors.toList()),
+                       isFinal(),
+                       isInstantiable,
+                       comparision,
+                       superType == null ? null : (StructuredType) 
superType.copy(),
+                       getDescription().orElse(null),
+                       implementationClass);
+       }
+
+       @Override
+       public boolean supportsInputConversion(Class<?> clazz) {
+               return (implementationClass != null && 
implementationClass.isAssignableFrom(clazz)) ||
+                       INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+       }
+
+       @Override
+       public boolean supportsOutputConversion(Class<?> clazz) {
+               StructuredType currentType = this;
+               while (currentType != null) {
+                       if (currentType.implementationClass != null && 
clazz.isAssignableFrom(currentType.implementationClass)) {
+                               return true;
+                       }
+                       currentType = currentType.superType;
+               }
+               return INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+       }
+
+       @Override
+       public Class<?> getDefaultConversion() {
+               if (implementationClass != null) {
+                       return implementationClass;
+               }
+               return FALLBACK_CONVERSION;
+       }
+
+       @Override
+       public List<LogicalType> getChildren() {
+               final ArrayList<LogicalType> children = new ArrayList<>();
+               StructuredType currentType = this;
+               while (currentType != null) {
+                       children.addAll(
+                               currentType.attributes.stream()
+                                       .map(StructuredAttribute::getType)
+                                       .collect(Collectors.toList()));
+                       currentType = currentType.superType;
+               }
+               Collections.reverse(children);
+               return Collections.unmodifiableList(children);
+       }
+
+       @Override
+       public <R> R accept(LogicalTypeVisitor<R> visitor) {
+               return visitor.visit(this);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+               StructuredType that = (StructuredType) o;
+               return isInstantiable == that.isInstantiable &&
+                       attributes.equals(that.attributes) &&
+                       comparision == that.comparision &&
+                       Objects.equals(superType, that.superType) &&
+                       Objects.equals(implementationClass, 
that.implementationClass);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                       super.hashCode(),
+                       attributes,
+                       isInstantiable,
+                       comparision,
+                       superType,
+                       implementationClass);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
new file mode 100644
index 0000000..4509ab9
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Logical type of a user-defined representation for one or more built-in 
types. A user-defined
+ * type is either a distinct type or a structured type.
+ *
+ * <p>A {@link UserDefinedType} instance is the result of a catalog lookup or 
an explicit definition.
+ * Therefore, the serialized string representation is a unique {@link 
TypeIdentifier}.
+ *
+ * <p>NOTE: Compared to the SQL standard, this class and subclasses are 
incomplete. We might add new
+ * features such as method declarations in the future.
+ *
+ * @see DistinctType
+ * @see StructuredType
+ */
+@PublicEvolving
+public abstract class UserDefinedType extends LogicalType {
+
+       /**
+        * Fully qualifies a user-defined type. Two user-defined types are 
considered equal if they
+        * share the same type identifier.
+        */
+       public static final class TypeIdentifier implements Serializable {
+
+               private @Nullable String catalogName;
+
+               private @Nullable String databaseName;
+
+               private String typeName;
+
+               public TypeIdentifier(
+                               @Nullable String catalogName,
+                               @Nullable String databaseName,
+                               String typeName) {
+                       this.catalogName = catalogName;
+                       this.databaseName = databaseName;
+                       this.typeName = Preconditions.checkNotNull(typeName, 
"Type name must not be null.");
+               }
+
+               public TypeIdentifier(@Nullable String databaseName, String 
typeName) {
+                       this(null, databaseName, typeName);
+               }
+
+               public TypeIdentifier(String typeName) {
+                       this(null, null, typeName);
+               }
+
+               public Optional<String> getCatalogName() {
+                       return Optional.ofNullable(catalogName);
+               }
+
+               public Optional<String> getDatabaseName() {
+                       return Optional.ofNullable(databaseName);
+               }
+
+               public String getTypeName() {
+                       return typeName;
+               }
+
+               @Override
+               public String toString() {
+                       final StringBuilder sb = new StringBuilder();
+                       if (catalogName != null) {
+                               sb.append(escapeIdentifier(catalogName));
+                               sb.append('.');
+                       }
+                       if (databaseName != null) {
+                               sb.append(escapeIdentifier(databaseName));
+                               sb.append('.');
+                       }
+                       sb.append(escapeIdentifier(typeName));
+                       return sb.toString();
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       TypeIdentifier that = (TypeIdentifier) o;
+                       return Objects.equals(catalogName, that.catalogName) &&
+                               Objects.equals(databaseName, that.databaseName) 
&&
+                               typeName.equals(that.typeName);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(catalogName, databaseName, 
typeName);
+               }
+       }
+
+       private final TypeIdentifier typeIdentifier;
+
+       private final boolean isFinal;
+
+       private final @Nullable String description;
+
+       UserDefinedType(
+                       boolean isNullable,
+                       LogicalTypeRoot typeRoot,
+                       TypeIdentifier typeIdentifier,
+                       boolean isFinal,
+                       @Nullable String description) {
+               super(isNullable, typeRoot);
+               this.typeIdentifier = 
Preconditions.checkNotNull(typeIdentifier, "Type identifier must not be null.");
+               this.isFinal = isFinal;
+               this.description = description;
+       }
+
+       public TypeIdentifier getTypeIdentifier() {
+               return typeIdentifier;
+       }
+
+       public boolean isFinal() {
+               return isFinal;
+       }
+
+       public Optional<String> getDescription() {
+               return Optional.ofNullable(description);
+       }
+
+       @Override
+       public String asSerializableString() {
+               return withNullability(typeIdentifier.toString());
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+               UserDefinedType that = (UserDefinedType) o;
+               return isFinal == that.isFinal &&
+                       typeIdentifier.equals(that.typeIdentifier) &&
+                       Objects.equals(description, that.description);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(super.hashCode(), typeIdentifier, isFinal, 
description);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index fc339af..8d0e169 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.DayTimeIntervalType;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
@@ -35,9 +36,11 @@ import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
 import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UserDefinedType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
@@ -50,6 +53,7 @@ import org.junit.Test;
 
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -391,6 +395,43 @@ public class LogicalTypesTest {
                );
        }
 
+       @Test
+       public void testDistinctType() {
+               testAll(
+                       createDistinctType("Money"),
+                       "`cat`.`db`.`Money`",
+                       "`cat`.`db`.`Money`",
+                       new Class[]{BigDecimal.class},
+                       new Class[]{BigDecimal.class},
+                       new LogicalType[]{new DecimalType(10, 2)},
+                       createDistinctType("Monetary")
+               );
+       }
+
+       @Test
+       public void testStructuredType() {
+               testAll(
+                       createUserType(true),
+                       "`cat`.`db`.`User`",
+                       "`cat`.`db`.`User`",
+                       new Class[]{Row.class, User.class},
+                       new Class[]{Row.class, Human.class, User.class},
+                       new LogicalType[]{UDT_NAME_TYPE, UDT_SETTING_TYPE},
+                       createUserType(false)
+               );
+
+               testConversions(
+                       createHumanType(false),
+                       new Class[]{Row.class, Human.class, User.class}, // 
every User is Human
+                       new Class[]{Row.class, Human.class});
+
+               // not every Human is User
+               
assertFalse(createUserType(true).supportsInputConversion(Human.class));
+
+               // User is not implementing SpecialHuman
+               
assertFalse(createHumanType(true).supportsInputConversion(User.class));
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private static void testAll(
@@ -478,4 +519,53 @@ public class LogicalTypesTest {
        private static void testChildren(LogicalType type, LogicalType[] 
children) {
                assertEquals(Arrays.asList(children), type.getChildren());
        }
+
+       private DistinctType createDistinctType(String typeName) {
+               return new DistinctType.Builder(
+                               new UserDefinedType.TypeIdentifier("cat", "db", 
typeName),
+                               new DecimalType(10, 2))
+                       .setDescription("Money type desc.")
+                       .build();
+       }
+
+       private static final LogicalType UDT_NAME_TYPE = new VarCharType();
+
+       private static final LogicalType UDT_SETTING_TYPE = new IntType();
+
+       private StructuredType createHumanType(boolean 
useDifferentImplementation) {
+               return new StructuredType.Builder(
+                               new UserDefinedType.TypeIdentifier("cat", "db", 
"Human"),
+                               Collections.singletonList(
+                                       new 
StructuredType.StructuredAttribute("name", UDT_NAME_TYPE)))
+                       .setDescription("Human type desc.")
+                       .setFinal(false)
+                       .setInstantiable(false)
+                       .setImplementationClass(useDifferentImplementation ? 
SpecialHuman.class : Human.class)
+                       .build();
+       }
+
+       private StructuredType createUserType(boolean isFinal) {
+               return new StructuredType.Builder(
+                               new UserDefinedType.TypeIdentifier("cat", "db", 
"User"),
+                               Collections.singletonList(
+                                       new 
StructuredType.StructuredAttribute("setting", UDT_SETTING_TYPE)))
+                       .setDescription("User type desc.")
+                       .setFinal(isFinal)
+                       .setInstantiable(true)
+                       .setImplementationClass(User.class)
+                       .setSuperType(createHumanType(false))
+                       .build();
+       }
+
+       private abstract static class SpecialHuman {
+               public String name;
+       }
+
+       private abstract static class Human {
+               public String name;
+       }
+
+       private static final class User extends Human {
+               public int setting;
+       }
 }

Reply via email to