This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c384aea  [FLINK-12253][table-common] Add an ANY type
c384aea is described below

commit c384aea8116aa9ad514bbb59fd8578bb285a4e6f
Author: Timo Walther <twal...@apache.org>
AuthorDate: Thu May 2 14:51:54 2019 +0200

    [FLINK-12253][table-common] Add an ANY type
---
 .../apache/flink/table/types/logical/AnyType.java  | 157 +++++++++++++++++++++
 .../table/types/logical/LogicalTypeVisitor.java    |   5 +
 .../apache/flink/table/types/logical/NullType.java |   4 +-
 .../types/logical/TypeInformationAnyType.java      | 148 +++++++++++++++++++
 .../apache/flink/table/utils/EncodingUtils.java    |   6 +-
 .../apache/flink/table/types/LogicalTypesTest.java |  44 ++++++
 6 files changed, 361 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
new file mode 100644
index 0000000..4f17539
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of an arbitrary serialized type. This type is a black box 
within the table ecosystem
+ * and is only deserialized at the edges. The any type is an extension to the 
SQL standard.
+ *
+ * <p>The serialized string representation is {@code ANY(c, s)} where {@code 
c} is the originating
+ * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in 
Base64 encoding.
+ *
+ * @param <T> originating class for this type
+ */
+@PublicEvolving
+public final class AnyType<T> extends LogicalType {
+
+       private static final String FORMAT = "ANY(%s, %s)";
+
+       private static final Set<String> INPUT_OUTPUT_CONVERSION = 
conversionSet(
+               byte[].class.getName(),
+               "org.apache.flink.table.dataformat.BinaryGeneric");
+
+       private final Class<T> clazz;
+
+       private final TypeSerializer<T> serializer;
+
+       private transient String serializerString;
+
+       public AnyType(boolean isNullable, Class<T> clazz, TypeSerializer<T> 
serializer) {
+               super(isNullable, LogicalTypeRoot.ANY);
+               this.clazz = Preconditions.checkNotNull(clazz, "Class must not 
be null.");
+               this.serializer = Preconditions.checkNotNull(serializer, 
"Serializer must not be null.");
+       }
+
+       public AnyType(Class<T> clazz, TypeSerializer<T> serializer) {
+               this(true, clazz, serializer);
+       }
+
+       public Class<T> getOriginatingClass() {
+               return clazz;
+       }
+
+       public TypeSerializer<T> getTypeSerializer() {
+               return serializer;
+       }
+
+       @Override
+       public LogicalType copy(boolean isNullable) {
+               return new AnyType<>(isNullable, clazz, serializer.duplicate());
+       }
+
+       @Override
+       public String asSummaryString() {
+               return withNullability(FORMAT, clazz.getName(), "...");
+       }
+
+       @Override
+       public String asSerializableString() {
+               return withNullability(FORMAT, clazz.getName(), 
getOrCreateSerializerString());
+       }
+
+       @Override
+       public boolean supportsInputConversion(Class<?> clazz) {
+               return this.clazz.isAssignableFrom(clazz) ||
+                       INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+       }
+
+       @Override
+       public boolean supportsOutputConversion(Class<?> clazz) {
+               return clazz.isAssignableFrom(this.clazz) ||
+                       INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+       }
+
+       @Override
+       public Class<?> getDefaultConversion() {
+               return clazz;
+       }
+
+       @Override
+       public List<LogicalType> getChildren() {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public <R> R accept(LogicalTypeVisitor<R> visitor) {
+               return visitor.visit(this);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+               AnyType<?> anyType = (AnyType<?>) o;
+               return clazz.equals(anyType.clazz) && 
serializer.equals(anyType.serializer);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(super.hashCode(), clazz, serializer);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private String getOrCreateSerializerString() {
+               if (serializerString == null) {
+                       final DataOutputSerializer outputSerializer = new 
DataOutputSerializer(128);
+                       try {
+                               
serializer.snapshotConfiguration().writeSnapshot(outputSerializer);
+                               serializerString = 
EncodingUtils.encodeBytesToBase64(outputSerializer.getCopyOfBuffer());
+                               return serializerString;
+                       } catch (Exception e) {
+                               throw new TableException(String.format(
+                                       "Unable to generate a string 
representation of the serializer snapshot of '%s' " +
+                                               "describing the class '%s' for 
the ANY type.",
+                                       serializer.getClass().getName(),
+                                       clazz.toString()));
+                       }
+               }
+               return serializerString;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index bab6a96..9a3190f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -24,6 +24,9 @@ import org.apache.flink.annotation.PublicEvolving;
  * The visitor definition of {@link LogicalType}. The visitor transforms a 
logical type into
  * instances of {@code R}.
  *
+ * <p>Incomplete types such as the {@link TypeInformationAnyType} are visited 
through the generic
+ * {@link #visit(LogicalType)}.
+ *
  * @param <R> result type
  */
 @PublicEvolving
@@ -81,5 +84,7 @@ public interface LogicalTypeVisitor<R> {
 
        R visit(NullType nullType);
 
+       R visit(AnyType anyType);
+
        R visit(LogicalType other);
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
index 827fc28..17cbc69 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/NullType.java
@@ -37,7 +37,7 @@ import java.util.List;
 @PublicEvolving
 public final class NullType extends LogicalType {
 
-       private static final String DEFAULT_FORMAT = "NULL";
+       private static final String FORMAT = "NULL";
 
        private static final Class<?> INPUT_CONVERSION = Object.class;
 
@@ -59,7 +59,7 @@ public final class NullType extends LogicalType {
 
        @Override
        public String asSerializableString() {
-               return DEFAULT_FORMAT;
+               return FORMAT;
        }
 
        @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
new file mode 100644
index 0000000..2c1c0dc
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Placeholder type of an arbitrary serialized type backed by {@link 
TypeInformation}. This type is
+ * a black box within the table ecosystem and is only deserialized at the 
edges. The any type is an
+ * extension to the SQL standard.
+ *
+ * <p>Compared to an {@link AnyType}, this type does not contain a {@link 
TypeSerializer} yet. The
+ * serializer will be generated from the enclosed {@link TypeInformation} but 
needs access to the
+ * {@link ExecutionConfig} of the current execution environment. Thus, this 
type is just a placeholder
+ * for the fully resolved {@link AnyType} returned by {@link 
#resolve(ExecutionConfig)}.
+ *
+ * <p>This type has no serializable string representation.
+ *
+ * <p>If no type information is supplied, generic type serialization for 
{@link Object} is used.
+ */
+@PublicEvolving
+public final class TypeInformationAnyType<T> extends LogicalType {
+
+       private static final String FORMAT = "ANY(%s, ?)";
+
+       private static final Set<String> INPUT_OUTPUT_CONVERSION = 
conversionSet(
+               byte[].class.getName(),
+               "org.apache.flink.table.dataformat.BinaryGeneric");
+
+       private static final TypeInformation<?> DEFAULT_TYPE_INFO = 
Types.GENERIC(Object.class);
+
+       private final TypeInformation<T> typeInfo;
+
+       public TypeInformationAnyType(boolean isNullable, TypeInformation<T> 
typeInfo) {
+               super(isNullable, LogicalTypeRoot.ANY);
+               this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information must not be null.");
+       }
+
+       public TypeInformationAnyType(TypeInformation<T> typeInfo) {
+               this(true, typeInfo);
+       }
+
+       @SuppressWarnings("unchecked")
+       public TypeInformationAnyType() {
+               this(true, (TypeInformation<T>) DEFAULT_TYPE_INFO);
+       }
+
+       public TypeInformation<T> getTypeInformation() {
+               return typeInfo;
+       }
+
+       @Internal
+       public AnyType<T> resolve(ExecutionConfig config) {
+               return new AnyType<>(isNullable(), typeInfo.getTypeClass(), 
typeInfo.createSerializer(config));
+       }
+
+       @Override
+       public LogicalType copy(boolean isNullable) {
+               return new TypeInformationAnyType<>(isNullable, typeInfo); // 
we must assume immutability here
+       }
+
+       @Override
+       public String asSummaryString() {
+               return withNullability(FORMAT, 
typeInfo.getTypeClass().getName());
+       }
+
+       @Override
+       public String asSerializableString() {
+               throw new TableException(
+                       "An any type backed by type information has no 
serializable string representation. It " +
+                               "needs to be resolved into a proper any type.");
+       }
+
+       @Override
+       public boolean supportsInputConversion(Class<?> clazz) {
+               return typeInfo.getTypeClass().isAssignableFrom(clazz) ||
+                       INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+       }
+
+       @Override
+       public boolean supportsOutputConversion(Class<?> clazz) {
+               return clazz.isAssignableFrom(typeInfo.getTypeClass()) ||
+                       INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+       }
+
+       @Override
+       public Class<?> getDefaultConversion() {
+               return typeInfo.getTypeClass();
+       }
+
+       @Override
+       public List<LogicalType> getChildren() {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public <R> R accept(LogicalTypeVisitor<R> visitor) {
+               return visitor.visit(this);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+               TypeInformationAnyType<?> that = (TypeInformationAnyType<?>) o;
+               return typeInfo.equals(that.typeInfo);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(super.hashCode(), typeInfo);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 5531082..35e57ae 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -93,8 +93,12 @@ public abstract class EncodingUtils {
                return loadClass(qualifiedName, 
Thread.currentThread().getContextClassLoader());
        }
 
+       public static String encodeBytesToBase64(byte[] bytes) {
+               return new String(java.util.Base64.getEncoder().encode(bytes), 
UTF_8);
+       }
+
        public static String encodeStringToBase64(String string) {
-               return new 
String(java.util.Base64.getEncoder().encode(string.getBytes(UTF_8)), UTF_8);
+               return encodeBytesToBase64(string.getBytes(UTF_8));
        }
 
        public static String decodeBase64ToString(String base64) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index ec452a8..e03e15d 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.types;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.types.logical.AnyType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BinaryType;
@@ -41,6 +47,7 @@ import org.apache.flink.table.types.logical.StructuredType;
 import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
 import org.apache.flink.table.types.logical.UserDefinedType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -454,6 +461,43 @@ public class LogicalTypesTest {
                assertFalse(nullType.supportsOutputConversion(int.class));
        }
 
+       @Test
+       public void testTypeInformationAnyType() {
+               final TypeInformationAnyType<?> anyType = new 
TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.INT));
+
+               testEquality(anyType, new 
TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG)));
+
+               testStringSummary(anyType, 
"ANY(org.apache.flink.api.java.tuple.Tuple2, ?)");
+
+               testNullability(anyType);
+
+               testJavaSerializability(anyType);
+
+               testConversions(anyType, new Class[]{Tuple2.class}, new 
Class[]{Tuple.class});
+       }
+
+       @Test
+       public void testAnyType() {
+               testAll(
+                       new AnyType<>(Human.class, new 
KryoSerializer<>(Human.class, new ExecutionConfig())),
+                               
"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " +
+                                       
"ADNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4AAATyxpo9cAA"
 +
+                                       
"AAAIAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuTG9naWNhbFR5cGVzVGVzdCRIdW1hbgEAAAA1AD"
 +
+                                       
"NvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4BAAAAOQAzb3JnL"
 +
+                                       
"mFwYWNoZS5mbGluay50YWJsZS50eXBlcy5Mb2dpY2FsVHlwZXNUZXN0JEh1bWFuAAAAAAApb3JnLmFwYWNo"
 +
+                                       
"ZS5hdnJvLmdlbmVyaWMuR2VuZXJpY0RhdGEkQXJyYXkBAAAAKwApb3JnLmFwYWNoZS5hdnJvLmdlbmVyaWM"
 +
+                                       
"uR2VuZXJpY0RhdGEkQXJyYXkBAAAAtgBVb3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMucn"
 +
+                                       
"VudGltZS5rcnlvLlNlcmlhbGl6ZXJzJER1bW15QXZyb1JlZ2lzdGVyZWRDbGFzcwAAAAEAWW9yZy5hcGFja"
 +
+                                       
"GUuZmxpbmsuYXBpLmphdmEudHlwZXV0aWxzLnJ1bnRpbWUua3J5by5TZXJpYWxpemVycyREdW1teUF2cm9L"
 +
+                                       
"cnlvU2VyaWFsaXplckNsYXNzAAAE8saaPXAAAAAAAAAE8saaPXAAAAAA)",
+                       
"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, ...)",
+                       new Class[]{Human.class, User.class}, // every User is 
Human
+                       new Class[]{Human.class},
+                       new LogicalType[]{},
+                       new AnyType<>(User.class, new 
KryoSerializer<>(User.class, new ExecutionConfig()))
+               );
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private static void testAll(

Reply via email to