This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new fe6b460 [FLINK-12253][table-common] Add user-defined types fe6b460 is described below commit fe6b46053fac9c534adee72482112f583925b5fe Author: Timo Walther <twal...@apache.org> AuthorDate: Thu May 2 13:30:59 2019 +0200 [FLINK-12253][table-common] Add user-defined types --- .../flink/table/types/logical/DistinctType.java | 150 +++++++++ .../table/types/logical/LogicalTypeVisitor.java | 4 + .../flink/table/types/logical/StructuredType.java | 344 +++++++++++++++++++++ .../flink/table/types/logical/UserDefinedType.java | 178 +++++++++++ .../apache/flink/table/types/LogicalTypesTest.java | 90 ++++++ 5 files changed, 766 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java new file mode 100644 index 0000000..80aa433 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Logical type of a user-defined distinct type. A distinct type specifies an identifier and is backed + * by a source type. A distinct type has the same internal representation as a source type, but is + * considered to be a separate and incompatible data type for most operations. Compared to the SQL + * standard, every non-user-defined type can be used as a source type. + * + * <p>A distinct type can always be cast to its source type and vice versa. + * + * <p>Distinct types are implicitly final and do not support super types. + * + * <p>Most other properties are forwarded from the source type. Thus, ordering and comparision among + * the same distinct types are supported. + * + * <p>The serialized string representation is the fully qualified name of this type which means that + * the type must have been registered in a catalog. + */ +@PublicEvolving +public final class DistinctType extends UserDefinedType { + + /** + * A builder for a {@link DistinctType}. Intended for future extensibility. + */ + public static final class Builder { + + private final TypeIdentifier typeIdentifier; + + private final LogicalType sourceType; + + private @Nullable String description; + + public Builder(TypeIdentifier typeIdentifier, LogicalType sourceType) { + this.typeIdentifier = Preconditions.checkNotNull(typeIdentifier, "Type identifier must not be null."); + this.sourceType = Preconditions.checkNotNull(sourceType, "Source type must not be null."); + + Preconditions.checkArgument( + !sourceType.getTypeRoot().getFamilies().contains(LogicalTypeFamily.USER_DEFINED), + "Source type must not be a user-defined type."); + } + + public Builder setDescription(String description) { + this.description = Preconditions.checkNotNull(description, "Description must not be null"); + return this; + } + + public DistinctType build() { + return new DistinctType(typeIdentifier, sourceType, description); + } + } + + private final LogicalType sourceType; + + private DistinctType( + TypeIdentifier typeIdentifier, + LogicalType sourceType, + @Nullable String description) { + super( + sourceType.isNullable(), + LogicalTypeRoot.DISTINCT_TYPE, + typeIdentifier, + true, + description); + this.sourceType = Preconditions.checkNotNull(sourceType, "Source type must not be null."); + } + + public LogicalType getSourceType() { + return sourceType; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new DistinctType( + getTypeIdentifier(), + sourceType.copy(isNullable), + getDescription().orElse(null)); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return sourceType.supportsInputConversion(clazz); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + return sourceType.supportsOutputConversion(clazz); + } + + @Override + public Class<?> getDefaultConversion() { + return sourceType.getDefaultConversion(); + } + + @Override + public List<LogicalType> getChildren() { + return Collections.singletonList(sourceType); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DistinctType that = (DistinctType) o; + return sourceType.equals(that.sourceType); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), sourceType); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java index 2e1d3de..c574f4b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java @@ -75,5 +75,9 @@ public interface LogicalTypeVisitor<R> { R visit(RowType rowType); + R visit(DistinctType distinctType); + + R visit(StructuredType structuredType); + R visit(LogicalType other); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java new file mode 100644 index 0000000..f76ad6b --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Logical type of a user-defined object structured type. Structured types contain one or more + * attributes. Each attribute consists of a name and a type. A type cannot be defined so that one of + * its attribute types (transitively) uses itself. + * + * <p>A structured type can declare a super type and allows single inheritance for more complex type + * hierarchies, similar to JVM-based languages. + * + * <p>A structured type must be declared {@code final} for preventing further inheritance (default + * behavior) or {@code not final} for allowing subtypes. + * + * <p>A structured type must be declared {@code not instantiable} if a more specific type is + * required or {@code instantiable} if instances can be created from this type (default behavior). + * + * <p>A structured type declares comparision properties of either {@code none} (no equality), + * {@code equals} (only equality and inequality), or {@code full} (greater, equals, less). + * + * <p>NOTE: Compared to the SQL standard, this class is incomplete. We might add new features such + * as method declarations in the future. Also ordering is not supported yet. + */ +@PublicEvolving +public final class StructuredType extends UserDefinedType { + + private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet( + Row.class.getName(), + "org.apache.flink.table.dataformat.BaseRow"); + + private static final Class<?> FALLBACK_CONVERSION = Row.class; + + /** + * Defines an attribute of a {@link StructuredType}. + */ + public static final class StructuredAttribute implements Serializable { + + private final String name; + + private final LogicalType type; + + public StructuredAttribute(String name, LogicalType type) { + this.name = Preconditions.checkNotNull(name, "Attribute name must not be null."); + this.type = Preconditions.checkNotNull(type, "Attribute type must not be null."); + } + + public String getName() { + return name; + } + + public LogicalType getType() { + return type; + } + + public StructuredAttribute copy() { + return new StructuredAttribute(name, type.copy()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StructuredAttribute that = (StructuredAttribute) o; + return name.equals(that.name) && type.equals(that.type); + } + + @Override + public int hashCode() { + return Objects.hash(name, type); + } + } + + /** + * Defines equality properties for scalar evaluation. + */ + public enum StructuredComparision { + EQUALS, + FULL, + NONE + } + + /** + * A builder for a {@link StructuredType}. Intended for future extensibility. + */ + public static class Builder { + + private final TypeIdentifier typeIdentifier; + + private final List<StructuredAttribute> attributes; + + private boolean isNullable = true; + + private boolean isFinal = true; + + private boolean isInstantiable = true; + + private StructuredComparision comparision = StructuredComparision.NONE; + + private @Nullable StructuredType superType; + + private @Nullable String description; + + private @Nullable Class<?> implementationClass; + + public Builder(TypeIdentifier typeIdentifier, List<StructuredAttribute> attributes) { + this.typeIdentifier = Preconditions.checkNotNull(typeIdentifier, "Type identifier must not be null."); + this.attributes = Collections.unmodifiableList( + new ArrayList<>( + Preconditions.checkNotNull(attributes, "Attributes must not be null."))); + + Preconditions.checkArgument( + attributes.size() > 0, + "Attribute list must not be empty."); + } + + public Builder setNullable(boolean isNullable) { + this.isNullable = isNullable; + return this; + } + + public Builder setDescription(String description) { + this.description = Preconditions.checkNotNull(description, "Description must not be null."); + return this; + } + + public Builder setFinal(boolean isFinal) { + this.isFinal = isFinal; + return this; + } + + public Builder setInstantiable(boolean isInstantiable) { + this.isInstantiable = isInstantiable; + return this; + } + + public Builder setComparision(StructuredComparision comparision) { + this.comparision = Preconditions.checkNotNull(comparision, "Comparision must not be null."); + return this; + } + + public Builder setSuperType(@Nullable StructuredType superType) { + this.superType = Preconditions.checkNotNull(superType, "Super type must not be null."); + return this; + } + + public Builder setImplementationClass(Class<?> implementationClass) { + this.implementationClass = Preconditions.checkNotNull(implementationClass, "Implementation class must not null."); + return this; + } + + public StructuredType build() { + return new StructuredType( + isNullable, + typeIdentifier, + attributes, + isFinal, + isInstantiable, + comparision, + superType, + description, + implementationClass); + } + } + + private final List<StructuredAttribute> attributes; + + private final boolean isInstantiable; + + private final StructuredComparision comparision; + + private final @Nullable StructuredType superType; + + private final @Nullable Class<?> implementationClass; + + private StructuredType( + boolean isNullable, + TypeIdentifier typeIdentifier, + List<StructuredAttribute> attributes, + boolean isFinal, + boolean isInstantiable, + StructuredComparision comparision, + @Nullable StructuredType superType, + @Nullable String description, + @Nullable Class<?> implementationClass) { + super( + isNullable, + LogicalTypeRoot.STRUCTURED_TYPE, + typeIdentifier, + isFinal, + description); + + this.attributes = attributes; + this.isInstantiable = isInstantiable; + this.comparision = comparision; + this.superType = superType; + this.implementationClass = implementationClass; + } + + public List<StructuredAttribute> getAttributes() { + return attributes; + } + + public boolean isInstantiable() { + return isInstantiable; + } + + public StructuredComparision getComparision() { + return comparision; + } + + public Optional<StructuredType> getSuperType() { + return Optional.ofNullable(superType); + } + + public Optional<Class<?>> getImplementationClass() { + return Optional.ofNullable(implementationClass); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new StructuredType( + isNullable, + getTypeIdentifier(), + attributes.stream().map(StructuredAttribute::copy).collect(Collectors.toList()), + isFinal(), + isInstantiable, + comparision, + superType == null ? null : (StructuredType) superType.copy(), + getDescription().orElse(null), + implementationClass); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return (implementationClass != null && implementationClass.isAssignableFrom(clazz)) || + INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + StructuredType currentType = this; + while (currentType != null) { + if (currentType.implementationClass != null && clazz.isAssignableFrom(currentType.implementationClass)) { + return true; + } + currentType = currentType.superType; + } + return INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class<?> getDefaultConversion() { + if (implementationClass != null) { + return implementationClass; + } + return FALLBACK_CONVERSION; + } + + @Override + public List<LogicalType> getChildren() { + final ArrayList<LogicalType> children = new ArrayList<>(); + StructuredType currentType = this; + while (currentType != null) { + children.addAll( + currentType.attributes.stream() + .map(StructuredAttribute::getType) + .collect(Collectors.toList())); + currentType = currentType.superType; + } + Collections.reverse(children); + return Collections.unmodifiableList(children); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + StructuredType that = (StructuredType) o; + return isInstantiable == that.isInstantiable && + attributes.equals(that.attributes) && + comparision == that.comparision && + Objects.equals(superType, that.superType) && + Objects.equals(implementationClass, that.implementationClass); + } + + @Override + public int hashCode() { + return Objects.hash( + super.hashCode(), + attributes, + isInstantiable, + comparision, + superType, + implementationClass); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java new file mode 100644 index 0000000..4509ab9 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UserDefinedType.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.Optional; + +/** + * Logical type of a user-defined representation for one or more built-in types. A user-defined + * type is either a distinct type or a structured type. + * + * <p>A {@link UserDefinedType} instance is the result of a catalog lookup or an explicit definition. + * Therefore, the serialized string representation is a unique {@link TypeIdentifier}. + * + * <p>NOTE: Compared to the SQL standard, this class and subclasses are incomplete. We might add new + * features such as method declarations in the future. + * + * @see DistinctType + * @see StructuredType + */ +@PublicEvolving +public abstract class UserDefinedType extends LogicalType { + + /** + * Fully qualifies a user-defined type. Two user-defined types are considered equal if they + * share the same type identifier. + */ + public static final class TypeIdentifier implements Serializable { + + private @Nullable String catalogName; + + private @Nullable String databaseName; + + private String typeName; + + public TypeIdentifier( + @Nullable String catalogName, + @Nullable String databaseName, + String typeName) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.typeName = Preconditions.checkNotNull(typeName, "Type name must not be null."); + } + + public TypeIdentifier(@Nullable String databaseName, String typeName) { + this(null, databaseName, typeName); + } + + public TypeIdentifier(String typeName) { + this(null, null, typeName); + } + + public Optional<String> getCatalogName() { + return Optional.ofNullable(catalogName); + } + + public Optional<String> getDatabaseName() { + return Optional.ofNullable(databaseName); + } + + public String getTypeName() { + return typeName; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + if (catalogName != null) { + sb.append(escapeIdentifier(catalogName)); + sb.append('.'); + } + if (databaseName != null) { + sb.append(escapeIdentifier(databaseName)); + sb.append('.'); + } + sb.append(escapeIdentifier(typeName)); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TypeIdentifier that = (TypeIdentifier) o; + return Objects.equals(catalogName, that.catalogName) && + Objects.equals(databaseName, that.databaseName) && + typeName.equals(that.typeName); + } + + @Override + public int hashCode() { + return Objects.hash(catalogName, databaseName, typeName); + } + } + + private final TypeIdentifier typeIdentifier; + + private final boolean isFinal; + + private final @Nullable String description; + + UserDefinedType( + boolean isNullable, + LogicalTypeRoot typeRoot, + TypeIdentifier typeIdentifier, + boolean isFinal, + @Nullable String description) { + super(isNullable, typeRoot); + this.typeIdentifier = Preconditions.checkNotNull(typeIdentifier, "Type identifier must not be null."); + this.isFinal = isFinal; + this.description = description; + } + + public TypeIdentifier getTypeIdentifier() { + return typeIdentifier; + } + + public boolean isFinal() { + return isFinal; + } + + public Optional<String> getDescription() { + return Optional.ofNullable(description); + } + + @Override + public String asSerializableString() { + return withNullability(typeIdentifier.toString()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + UserDefinedType that = (UserDefinedType) o; + return isFinal == that.isFinal && + typeIdentifier.equals(that.typeIdentifier) && + Objects.equals(description, that.description); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), typeIdentifier, isFinal, description); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java index fc339af..8d0e169 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.DayTimeIntervalType; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.IntType; @@ -35,9 +36,11 @@ import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; import org.apache.flink.table.types.logical.TimeType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.UserDefinedType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.YearMonthIntervalType; @@ -50,6 +53,7 @@ import org.junit.Test; import java.math.BigDecimal; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -391,6 +395,43 @@ public class LogicalTypesTest { ); } + @Test + public void testDistinctType() { + testAll( + createDistinctType("Money"), + "`cat`.`db`.`Money`", + "`cat`.`db`.`Money`", + new Class[]{BigDecimal.class}, + new Class[]{BigDecimal.class}, + new LogicalType[]{new DecimalType(10, 2)}, + createDistinctType("Monetary") + ); + } + + @Test + public void testStructuredType() { + testAll( + createUserType(true), + "`cat`.`db`.`User`", + "`cat`.`db`.`User`", + new Class[]{Row.class, User.class}, + new Class[]{Row.class, Human.class, User.class}, + new LogicalType[]{UDT_NAME_TYPE, UDT_SETTING_TYPE}, + createUserType(false) + ); + + testConversions( + createHumanType(false), + new Class[]{Row.class, Human.class, User.class}, // every User is Human + new Class[]{Row.class, Human.class}); + + // not every Human is User + assertFalse(createUserType(true).supportsInputConversion(Human.class)); + + // User is not implementing SpecialHuman + assertFalse(createHumanType(true).supportsInputConversion(User.class)); + } + // -------------------------------------------------------------------------------------------- private static void testAll( @@ -478,4 +519,53 @@ public class LogicalTypesTest { private static void testChildren(LogicalType type, LogicalType[] children) { assertEquals(Arrays.asList(children), type.getChildren()); } + + private DistinctType createDistinctType(String typeName) { + return new DistinctType.Builder( + new UserDefinedType.TypeIdentifier("cat", "db", typeName), + new DecimalType(10, 2)) + .setDescription("Money type desc.") + .build(); + } + + private static final LogicalType UDT_NAME_TYPE = new VarCharType(); + + private static final LogicalType UDT_SETTING_TYPE = new IntType(); + + private StructuredType createHumanType(boolean useDifferentImplementation) { + return new StructuredType.Builder( + new UserDefinedType.TypeIdentifier("cat", "db", "Human"), + Collections.singletonList( + new StructuredType.StructuredAttribute("name", UDT_NAME_TYPE))) + .setDescription("Human type desc.") + .setFinal(false) + .setInstantiable(false) + .setImplementationClass(useDifferentImplementation ? SpecialHuman.class : Human.class) + .build(); + } + + private StructuredType createUserType(boolean isFinal) { + return new StructuredType.Builder( + new UserDefinedType.TypeIdentifier("cat", "db", "User"), + Collections.singletonList( + new StructuredType.StructuredAttribute("setting", UDT_SETTING_TYPE))) + .setDescription("User type desc.") + .setFinal(isFinal) + .setInstantiable(true) + .setImplementationClass(User.class) + .setSuperType(createHumanType(false)) + .build(); + } + + private abstract static class SpecialHuman { + public String name; + } + + private abstract static class Human { + public String name; + } + + private static final class User extends Human { + public int setting; + } }