This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 066fadf [FLINK-12253][table-common] Add a MAP type 066fadf is described below commit 066fadf3f369074d1b95405b7cc6c078766ffd09 Author: Timo Walther <twal...@apache.org> AuthorDate: Thu May 2 11:49:49 2019 +0200 [FLINK-12253][table-common] Add a MAP type --- .../table/types/logical/LogicalTypeVisitor.java | 2 + .../apache/flink/table/types/logical/MapType.java | 136 +++++++++++++++++++++ .../apache/flink/table/types/LogicalTypesTest.java | 14 +++ 3 files changed, 152 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java index 33a496a..79dc207 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java @@ -71,5 +71,7 @@ public interface LogicalTypeVisitor<R> { R visit(MultisetType multisetType); + R visit(MapType mapType); + R visit(LogicalType other); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/MapType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/MapType.java new file mode 100644 index 0000000..4d77265 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/MapType.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of an associative array that maps keys (including {@code NULL}) to values (including + * {@code NULL}). A map cannot contain duplicate keys; each key can map to at most one value. There + * is no restriction of key types; it is the responsibility of the user to ensure uniqueness. The map + * type is an extension to the SQL standard. + * + * <p>The serialized string representation is {@code MAP<kt, vt>} where {@code kt} is the logical type + * of the key elements and {@code vt} is the logical type of the value elements. + */ +@PublicEvolving +public final class MapType extends LogicalType { + + private static final String FORMAT = "MAP<%s, %s>"; + + private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet( + Map.class.getName(), + "org.apache.flink.table.dataformat.BinaryMap"); + + private static final Class<?> DEFAULT_CONVERSION = Map.class; + + private final LogicalType keyType; + + private final LogicalType valueType; + + public MapType(boolean isNullable, LogicalType keyType, LogicalType valueType) { + super(isNullable, LogicalTypeRoot.MAP); + this.keyType = Preconditions.checkNotNull(keyType, "Key type must not be null."); + this.valueType = Preconditions.checkNotNull(valueType, "Value type must not be null."); + } + + public MapType(LogicalType keyType, LogicalType valueType) { + this(true, keyType, valueType); + } + + public LogicalType getKeyType() { + return keyType; + } + + public LogicalType getValueType() { + return valueType; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new MapType(isNullable, keyType.copy(), valueType.copy()); + } + + @Override + public String asSummaryString() { + return withNullability(FORMAT, + keyType.asSummaryString(), + valueType.asSummaryString()); + } + + @Override + public String asSerializableString() { + return withNullability(FORMAT, + keyType.asSerializableString(), + valueType.asSerializableString()); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + return INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class<?> getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List<LogicalType> getChildren() { + return Collections.unmodifiableList(Arrays.asList(keyType, valueType)); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + MapType mapType = (MapType) o; + return keyType.equals(mapType.keyType) && valueType.equals(mapType.valueType); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), keyType, valueType); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java index 1e621d7..2fe082d 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java @@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.TimeType; @@ -356,6 +357,19 @@ public class LogicalTypesTest { ); } + @Test + public void testMapType() { + testAll( + new MapType(new VarCharType(20), new TimestampType()), + "MAP<VARCHAR(20), TIMESTAMP(6)>", + "MAP<VARCHAR(20), TIMESTAMP(6)>", + new Class[]{Map.class}, + new Class[]{Map.class}, + new LogicalType[]{new VarCharType(20), new TimestampType()}, + new MapType(new VarCharType(99), new TimestampType()) + ); + } + // -------------------------------------------------------------------------------------------- private static void testAll(