This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c384aea [FLINK-12253][table-common] Add an ANY type c384aea is described below commit c384aea8116aa9ad514bbb59fd8578bb285a4e6f Author: Timo Walther <twal...@apache.org> AuthorDate: Thu May 2 14:51:54 2019 +0200 [FLINK-12253][table-common] Add an ANY type --- .../apache/flink/table/types/logical/AnyType.java | 157 +++++++++++++++++++++ .../table/types/logical/LogicalTypeVisitor.java | 5 + .../apache/flink/table/types/logical/NullType.java | 4 +- .../types/logical/TypeInformationAnyType.java | 148 +++++++++++++++++++ .../apache/flink/table/utils/EncodingUtils.java | 6 +- .../apache/flink/table/types/LogicalTypesTest.java | 44 ++++++ 6 files changed, 361 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java new file mode 100644 index 0000000..4f17539 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of an arbitrary serialized type. This type is a black box within the table ecosystem + * and is only deserialized at the edges. The any type is an extension to the SQL standard. + * + * <p>The serialized string representation is {@code ANY(c, s)} where {@code c} is the originating + * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in Base64 encoding. + * + * @param <T> originating class for this type + */ +@PublicEvolving +public final class AnyType<T> extends LogicalType { + + private static final String FORMAT = "ANY(%s, %s)"; + + private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet( + byte[].class.getName(), + "org.apache.flink.table.dataformat.BinaryGeneric"); + + private final Class<T> clazz; + + private final TypeSerializer<T> serializer; + + private transient String serializerString; + + public AnyType(boolean isNullable, Class<T> clazz, TypeSerializer<T> serializer) { + super(isNullable, LogicalTypeRoot.ANY); + this.clazz = Preconditions.checkNotNull(clazz, "Class must not be null."); + this.serializer = Preconditions.checkNotNull(serializer, "Serializer must not be null."); + } + + public AnyType(Class<T> clazz, TypeSerializer<T> serializer) { + this(true, clazz, serializer); + } + + public Class<T> getOriginatingClass() { + return clazz; + } + + public TypeSerializer<T> getTypeSerializer() { + return serializer; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new AnyType<>(isNullable, clazz, serializer.duplicate()); + } + + @Override + public String asSummaryString() { + return withNullability(FORMAT, clazz.getName(), "..."); + } + + @Override + public String asSerializableString() { + return withNullability(FORMAT, clazz.getName(), getOrCreateSerializerString()); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return this.clazz.isAssignableFrom(clazz) || + INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + return clazz.isAssignableFrom(this.clazz) || + INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class<?> getDefaultConversion() { + return clazz; + } + + @Override + public List<LogicalType> getChildren() { + return Collections.emptyList(); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + AnyType<?> anyType = (AnyType<?>) o; + return clazz.equals(anyType.clazz) && serializer.equals(anyType.serializer); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), clazz, serializer); + } + + // -------------------------------------------------------------------------------------------- + + private String getOrCreateSerializerString() { + if (serializerString == null) { + final DataOutputSerializer outputSerializer = new DataOutputSerializer(128); + try { + serializer.snapshotConfiguration().writeSnapshot(outputSerializer); + serializerString = EncodingUtils.encodeBytesToBase64(outputSerializer.getCopyOfBuffer()); + return serializerString; + } catch (Exception e) { + throw new TableException(String.format( + "Unable to generate a string representation of the serializer snapshot of '%s' " + + "describing the class '%s' for the ANY type.", + serializer.getClass().getName(), + clazz.toString())); + } + } + return serializerString; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java index bab6a96..9a3190f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java @@ -24,6 +24,9 @@ import org.apache.flink.annotation.PublicEvolving; * The visitor definition of {@link LogicalType}. The visitor transforms a logical type into * instances of {@code R}. * + * <p>Incomplete types such as the {@link TypeInformationAnyType} are visited through the generic + * {@link #visit(LogicalType)}. + * * @param <R> result type */ @PublicEvolving @@ -81,5 +84,7 @@ public interface LogicalTypeVisitor<R> { R visit(NullType nullType); + R visit(AnyType anyType); + R visit(LogicalType other); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java index 827fc28..17cbc69 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java @@ -37,7 +37,7 @@ import java.util.List; @PublicEvolving public final class NullType extends LogicalType { - private static final String DEFAULT_FORMAT = "NULL"; + private static final String FORMAT = "NULL"; private static final Class<?> INPUT_CONVERSION = Object.class; @@ -59,7 +59,7 @@ public final class NullType extends LogicalType { @Override public String asSerializableString() { - return DEFAULT_FORMAT; + return FORMAT; } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java new file mode 100644 index 0000000..2c1c0dc --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Placeholder type of an arbitrary serialized type backed by {@link TypeInformation}. This type is + * a black box within the table ecosystem and is only deserialized at the edges. The any type is an + * extension to the SQL standard. + * + * <p>Compared to an {@link AnyType}, this type does not contain a {@link TypeSerializer} yet. The + * serializer will be generated from the enclosed {@link TypeInformation} but needs access to the + * {@link ExecutionConfig} of the current execution environment. Thus, this type is just a placeholder + * for the fully resolved {@link AnyType} returned by {@link #resolve(ExecutionConfig)}. + * + * <p>This type has no serializable string representation. + * + * <p>If no type information is supplied, generic type serialization for {@link Object} is used. + */ +@PublicEvolving +public final class TypeInformationAnyType<T> extends LogicalType { + + private static final String FORMAT = "ANY(%s, ?)"; + + private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet( + byte[].class.getName(), + "org.apache.flink.table.dataformat.BinaryGeneric"); + + private static final TypeInformation<?> DEFAULT_TYPE_INFO = Types.GENERIC(Object.class); + + private final TypeInformation<T> typeInfo; + + public TypeInformationAnyType(boolean isNullable, TypeInformation<T> typeInfo) { + super(isNullable, LogicalTypeRoot.ANY); + this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + } + + public TypeInformationAnyType(TypeInformation<T> typeInfo) { + this(true, typeInfo); + } + + @SuppressWarnings("unchecked") + public TypeInformationAnyType() { + this(true, (TypeInformation<T>) DEFAULT_TYPE_INFO); + } + + public TypeInformation<T> getTypeInformation() { + return typeInfo; + } + + @Internal + public AnyType<T> resolve(ExecutionConfig config) { + return new AnyType<>(isNullable(), typeInfo.getTypeClass(), typeInfo.createSerializer(config)); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new TypeInformationAnyType<>(isNullable, typeInfo); // we must assume immutability here + } + + @Override + public String asSummaryString() { + return withNullability(FORMAT, typeInfo.getTypeClass().getName()); + } + + @Override + public String asSerializableString() { + throw new TableException( + "An any type backed by type information has no serializable string representation. It " + + "needs to be resolved into a proper any type."); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return typeInfo.getTypeClass().isAssignableFrom(clazz) || + INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + return clazz.isAssignableFrom(typeInfo.getTypeClass()) || + INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class<?> getDefaultConversion() { + return typeInfo.getTypeClass(); + } + + @Override + public List<LogicalType> getChildren() { + return Collections.emptyList(); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + TypeInformationAnyType<?> that = (TypeInformationAnyType<?>) o; + return typeInfo.equals(that.typeInfo); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), typeInfo); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java index 5531082..35e57ae 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java @@ -93,8 +93,12 @@ public abstract class EncodingUtils { return loadClass(qualifiedName, Thread.currentThread().getContextClassLoader()); } + public static String encodeBytesToBase64(byte[] bytes) { + return new String(java.util.Base64.getEncoder().encode(bytes), UTF_8); + } + public static String encodeStringToBase64(String string) { - return new String(java.util.Base64.getEncoder().encode(string.getBytes(UTF_8)), UTF_8); + return encodeBytesToBase64(string.getBytes(UTF_8)); } public static String decodeBase64ToString(String base64) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java index ec452a8..e03e15d 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java @@ -18,6 +18,12 @@ package org.apache.flink.table.types; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.table.types.logical.AnyType; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BinaryType; @@ -41,6 +47,7 @@ import org.apache.flink.table.types.logical.StructuredType; import org.apache.flink.table.types.logical.TimeType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.TypeInformationAnyType; import org.apache.flink.table.types.logical.UserDefinedType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; @@ -454,6 +461,43 @@ public class LogicalTypesTest { assertFalse(nullType.supportsOutputConversion(int.class)); } + @Test + public void testTypeInformationAnyType() { + final TypeInformationAnyType<?> anyType = new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.INT)); + + testEquality(anyType, new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG))); + + testStringSummary(anyType, "ANY(org.apache.flink.api.java.tuple.Tuple2, ?)"); + + testNullability(anyType); + + testJavaSerializability(anyType); + + testConversions(anyType, new Class[]{Tuple2.class}, new Class[]{Tuple.class}); + } + + @Test + public void testAnyType() { + testAll( + new AnyType<>(Human.class, new KryoSerializer<>(Human.class, new ExecutionConfig())), + "ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " + + "ADNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4AAATyxpo9cAA" + + "AAAIAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuTG9naWNhbFR5cGVzVGVzdCRIdW1hbgEAAAA1AD" + + "NvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4BAAAAOQAzb3JnL" + + "mFwYWNoZS5mbGluay50YWJsZS50eXBlcy5Mb2dpY2FsVHlwZXNUZXN0JEh1bWFuAAAAAAApb3JnLmFwYWNo" + + "ZS5hdnJvLmdlbmVyaWMuR2VuZXJpY0RhdGEkQXJyYXkBAAAAKwApb3JnLmFwYWNoZS5hdnJvLmdlbmVyaWM" + + "uR2VuZXJpY0RhdGEkQXJyYXkBAAAAtgBVb3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMucn" + + "VudGltZS5rcnlvLlNlcmlhbGl6ZXJzJER1bW15QXZyb1JlZ2lzdGVyZWRDbGFzcwAAAAEAWW9yZy5hcGFja" + + "GUuZmxpbmsuYXBpLmphdmEudHlwZXV0aWxzLnJ1bnRpbWUua3J5by5TZXJpYWxpemVycyREdW1teUF2cm9L" + + "cnlvU2VyaWFsaXplckNsYXNzAAAE8saaPXAAAAAAAAAE8saaPXAAAAAA)", + "ANY(org.apache.flink.table.types.LogicalTypesTest$Human, ...)", + new Class[]{Human.class, User.class}, // every User is Human + new Class[]{Human.class}, + new LogicalType[]{}, + new AnyType<>(User.class, new KryoSerializer<>(User.class, new ExecutionConfig())) + ); + } + // -------------------------------------------------------------------------------------------- private static void testAll(