This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dba11f7 [FLINK-12253][table-common] Add a DECIMAL type dba11f7 is described below commit dba11f70d06f504a2ac6ed3bbe1a019bc42f7d66 Author: Timo Walther <twal...@apache.org> AuthorDate: Tue Apr 30 17:05:26 2019 +0200 [FLINK-12253][table-common] Add a DECIMAL type --- .../flink/table/types/logical/DecimalType.java | 158 +++++++++++++++++++++ .../table/types/logical/LogicalTypeVisitor.java | 2 + .../apache/flink/table/types/LogicalTypesTest.java | 15 ++ 3 files changed, 175 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java new file mode 100644 index 0000000..017bcd4 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.math.BigDecimal; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a decimal number with fixed precision and scale. + * + * <p>The serialized string representation is {@code DECIMAL(p, s)} where {@code p} is the number of + * digits in a number (=precision) and {@code s} is the number of digits to the right of the decimal + * point in a number (=scale). {@code p} must have a value between 1 and 38 (both inclusive). {@code s} + * must have a value between 0 and {@code p} (both inclusive). The default value for {@code p} is 10. + * The default value for {@code s} is 0. + */ +@PublicEvolving +public final class DecimalType extends LogicalType { + + private static final int MIN_PRECISION = 1; + + private static final int MAX_PRECISION = 38; + + private static final int DEFAULT_PRECISION = 10; + + private static final int MIN_SCALE = 0; + + private static final int DEFAULT_SCALE = 0; + + private static final String FORMAT = "DECIMAL(%d, %d)"; + + private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet( + BigDecimal.class.getName(), + "org.apache.flink.table.dataformat.Decimal"); + + private static final Class<?> DEFAULT_CONVERSION = BigDecimal.class; + + private final int precision; + + private final int scale; + + public DecimalType(boolean isNullable, int precision, int scale) { + super(isNullable, LogicalTypeRoot.DECIMAL); + if (precision < MIN_PRECISION || precision > MAX_PRECISION) { + throw new ValidationException( + String.format( + "Decimal precision must be between %d and %d (both inclusive).", + MIN_PRECISION, + MAX_PRECISION)); + } + if (scale < MIN_SCALE || scale > precision) { + throw new ValidationException( + String.format( + "Decimal scale must be between %d and the precision %d (both inclusive).", + MIN_SCALE, + precision)); + } + this.precision = precision; + this.scale = scale; + } + + public DecimalType(int precision, int scale) { + this(true, precision, scale); + } + + public DecimalType(int precision) { + this(precision, DEFAULT_SCALE); + } + + public DecimalType() { + this(DEFAULT_PRECISION); + } + + public int getPrecision() { + return precision; + } + + public int getScale() { + return scale; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new DecimalType(isNullable, precision, scale); + } + + @Override + public String asSerializableString() { + return withNullability(FORMAT, precision, scale); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + return INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class<?> getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List<LogicalType> getChildren() { + return Collections.emptyList(); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DecimalType that = (DecimalType) o; + return precision == that.precision && scale == that.scale; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), precision, scale); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java index c7192a9..2ec9ca5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java @@ -39,5 +39,7 @@ public interface LogicalTypeVisitor<R> { R visit(VarBinaryType varBinaryType); + R visit(DecimalType decimalType); + R visit(LogicalType other); } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java index 2fe2f97..def5d76 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java @@ -21,6 +21,7 @@ package org.apache.flink.table.types; import org.apache.flink.table.types.logical.BinaryType; import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; @@ -29,6 +30,7 @@ import org.apache.flink.util.InstantiationUtil; import org.junit.Assert; import org.junit.Test; +import java.math.BigDecimal; import java.util.Arrays; import static org.junit.Assert.assertEquals; @@ -106,6 +108,19 @@ public class LogicalTypesTest { ); } + @Test + public void testDecimalType() { + testAll( + new DecimalType(10, 2), + "DECIMAL(10, 2)", + "DECIMAL(10, 2)", + new Class[]{BigDecimal.class}, + new Class[]{BigDecimal.class}, + new LogicalType[]{}, + new DecimalType() + ); + } + // -------------------------------------------------------------------------------------------- private static void testAll(