This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8ffdf72 [FLINK-12253][table-common] Add a VARCHAR type 8ffdf72 is described below commit 8ffdf72827552e088466897b52504fe49bba412e Author: Timo Walther <twal...@apache.org> AuthorDate: Tue Apr 30 16:23:18 2019 +0200 [FLINK-12253][table-common] Add a VARCHAR type --- .../table/types/logical/LogicalTypeVisitor.java | 2 + .../flink/table/types/logical/VarCharType.java | 136 +++++++++++++++++++++ .../apache/flink/table/types/LogicalTypesTest.java | 14 +++ 3 files changed, 152 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java index 865a264..7f78149 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java @@ -31,5 +31,7 @@ public interface LogicalTypeVisitor<R> { R visit(CharType charType); + R visit(VarCharType varCharType); + R visit(LogicalType other); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java new file mode 100644 index 0000000..c955c0a --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a variable-length character string. + * + * <p>The serialized string representation is {@code VARCHAR(n)} where {@code n} is the maximum + * number of code points. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE} (both + * inclusive). If no length is specified, {@code n} is equal to 1. + * + * <p>A conversion from and to {@code byte[]} assumes UTF-8 encoding. + */ +@PublicEvolving +public final class VarCharType extends LogicalType { + + private static final int MIN_LENGTH = 1; + + private static final int MAX_LENGTH = Integer.MAX_VALUE; + + private static final int DEFAULT_LENGTH = 1; + + private static final String FORMAT = "VARCHAR(%d)"; + + private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet( + String.class.getName(), + byte[].class.getName(), + "org.apache.flink.table.dataformat.BinaryString"); + + private static final Class<?> DEFAULT_CONVERSION = String.class; + + private final int length; + + public VarCharType(boolean isNullable, int length) { + super(isNullable, LogicalTypeRoot.VARCHAR); + if (length < MIN_LENGTH) { + throw new ValidationException( + String.format( + "Variable character string length must be between %d and %d (both inclusive).", + MIN_LENGTH, + MAX_LENGTH)); + } + this.length = length; + } + + public VarCharType(int length) { + this(true, length); + } + + public VarCharType() { + this(DEFAULT_LENGTH); + } + + public int getLength() { + return length; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new VarCharType(isNullable, length); + } + + @Override + public String asSerializableString() { + return withNullability(FORMAT, length); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + return INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class<?> getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List<LogicalType> getChildren() { + return Collections.emptyList(); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + VarCharType that = (VarCharType) o; + return length == that.length; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), length); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java index 63ec74e..f2a1e48 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java @@ -20,6 +20,7 @@ package org.apache.flink.table.types; import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.util.InstantiationUtil; import org.junit.Assert; @@ -50,6 +51,19 @@ public class LogicalTypesTest { ); } + @Test + public void testVarCharType() { + testAll( + new VarCharType(33), + "VARCHAR(33)", + "VARCHAR(33)", + new Class[]{String.class, byte[].class}, + new Class[]{String.class, byte[].class}, + new LogicalType[]{}, + new VarCharType(12) + ); + } + // -------------------------------------------------------------------------------------------- private static void testAll(