This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new b22ed197 [FLINK-30423] Improve CastExecutor and CastExecutors b22ed197 is described below commit b22ed19740f427852f404d93570e3b0cff1b09ef Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Tue Jan 3 11:51:02 2023 +0800 [FLINK-30423] Improve CastExecutor and CastExecutors --- .../table/store/file/casting/CastExecutor.java | 14 +- .../table/store/file/casting/CastExecutors.java | 286 +++++++++------------ .../table/store/file/casting/CastExecutorTest.java | 151 ++++------- 3 files changed, 167 insertions(+), 284 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutor.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutor.java index 74c0a2f1..3dbb04cc 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutor.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutor.java @@ -18,10 +18,6 @@ package org.apache.flink.table.store.file.casting; -import org.apache.flink.table.api.TableException; - -import javax.annotation.Nonnull; - /** * Interface to model a function that performs the casting of a value from one type to another. * Copied from flink. @@ -30,11 +26,7 @@ import javax.annotation.Nonnull; * @param <OUT> Output internal type */ public interface CastExecutor<IN, OUT> { - /** - * Cast the input value. The output is null only and only if the input is null. The method - * throws an exception if something goes wrong when casting. - * - * @param value Input value. - */ - OUT cast(@Nonnull IN value) throws TableException; + + /** Cast the input value. */ + OUT cast(IN value); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java index bbbddb47..d46c68fc 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java @@ -45,6 +45,7 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; /** Cast executors for input type and output type. */ public class CastExecutors { + /** * Resolve a {@link CastExecutor} for the provided input type and target type. Returns null if * no rule can be resolved. @@ -65,209 +66,162 @@ public class CastExecutors { { switch (outputType.getTypeRoot()) { case TINYINT: - { - return value -> ((Number) value).byteValue(); - } + return value -> ((Number) value).byteValue(); case SMALLINT: - { - return value -> ((Number) value).shortValue(); - } + return value -> ((Number) value).shortValue(); case INTEGER: - { - return value -> ((Number) value).intValue(); - } + return value -> ((Number) value).intValue(); case BIGINT: - { - return value -> ((Number) value).longValue(); - } + return value -> ((Number) value).longValue(); case FLOAT: - { - return value -> ((Number) value).floatValue(); - } + return value -> ((Number) value).floatValue(); case DOUBLE: - { - return value -> ((Number) value).doubleValue(); - } + return value -> ((Number) value).doubleValue(); case DECIMAL: - { - final DecimalType decimalType = (DecimalType) outputType; - return value -> { - final Number number = (Number) value; - switch (inputType.getTypeRoot()) { - case TINYINT: - case SMALLINT: - case INTEGER: - case BIGINT: - { - return DecimalDataUtils.castFrom( - number.longValue(), - decimalType.getPrecision(), - decimalType.getScale()); - } - default: - { - return DecimalDataUtils.castFrom( - number.doubleValue(), - decimalType.getPrecision(), - decimalType.getScale()); - } - } - }; - } + final DecimalType decimalType = (DecimalType) outputType; + return value -> { + final Number number = (Number) value; + switch (inputType.getTypeRoot()) { + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + { + return DecimalDataUtils.castFrom( + number.longValue(), + decimalType.getPrecision(), + decimalType.getScale()); + } + default: + { + return DecimalDataUtils.castFrom( + number.doubleValue(), + decimalType.getPrecision(), + decimalType.getScale()); + } + } + }; default: - { - return null; - } + return null; } } case DECIMAL: { switch (outputType.getTypeRoot()) { case TINYINT: - { - return value -> - (byte) DecimalDataUtils.castToIntegral((DecimalData) value); - } + return value -> + (byte) DecimalDataUtils.castToIntegral((DecimalData) value); case SMALLINT: - { - return value -> - (short) - DecimalDataUtils.castToIntegral( - (DecimalData) value); - } + return value -> + (short) DecimalDataUtils.castToIntegral((DecimalData) value); case INTEGER: - { - return value -> - (int) DecimalDataUtils.castToIntegral((DecimalData) value); - } + return value -> + (int) DecimalDataUtils.castToIntegral((DecimalData) value); case BIGINT: - { - return value -> - DecimalDataUtils.castToIntegral((DecimalData) value); - } + return value -> DecimalDataUtils.castToIntegral((DecimalData) value); case FLOAT: - { - return value -> - (float) DecimalDataUtils.doubleValue((DecimalData) value); - } + return value -> + (float) DecimalDataUtils.doubleValue((DecimalData) value); case DOUBLE: - { - return value -> DecimalDataUtils.doubleValue((DecimalData) value); - } + return value -> DecimalDataUtils.doubleValue((DecimalData) value); case DECIMAL: - { - DecimalType decimalType = (DecimalType) outputType; - return value -> - DecimalDataUtils.castToDecimal( - (DecimalData) value, - decimalType.getPrecision(), - decimalType.getScale()); - } + DecimalType decimalType = (DecimalType) outputType; + return value -> + DecimalDataUtils.castToDecimal( + (DecimalData) value, + decimalType.getPrecision(), + decimalType.getScale()); default: - { - return null; - } + return null; } } case CHAR: case VARCHAR: - { - if (outputType.getTypeRoot() == CHAR || outputType.getTypeRoot() == VARCHAR) { - final boolean targetCharType = outputType.getTypeRoot() == CHAR; - final int targetLength = getStringLength(outputType); - return value -> { - StringData result; - String strVal = value.toString(); - BinaryStringData strData = BinaryStringData.fromString(strVal); - if (strData.numChars() > targetLength) { - result = - BinaryStringData.fromString( - strVal.substring(0, targetLength)); - } else { - if (strData.numChars() < targetLength) { - if (targetCharType) { - int padLength = targetLength - strData.numChars(); - BinaryStringData padString = - BinaryStringData.blankString(padLength); - result = BinaryStringDataUtil.concat(strData, padString); - } else { - result = strData; - } + if (outputType.getTypeRoot() == CHAR || outputType.getTypeRoot() == VARCHAR) { + final boolean targetCharType = outputType.getTypeRoot() == CHAR; + final int targetLength = getStringLength(outputType); + return value -> { + StringData result; + String strVal = value.toString(); + BinaryStringData strData = BinaryStringData.fromString(strVal); + if (strData.numChars() > targetLength) { + result = BinaryStringData.fromString(strVal.substring(0, targetLength)); + } else { + if (strData.numChars() < targetLength) { + if (targetCharType) { + int padLength = targetLength - strData.numChars(); + BinaryStringData padString = + BinaryStringData.blankString(padLength); + result = BinaryStringDataUtil.concat(strData, padString); } else { result = strData; } - } - - return result; - }; - } else if (outputType.getTypeRoot() == VARBINARY) { - final int targetLength = getBinaryLength(outputType); - return value -> { - byte[] byteArrayTerm = ((StringData) value).toBytes(); - if (byteArrayTerm.length <= targetLength) { - return byteArrayTerm; } else { - return Arrays.copyOf(byteArrayTerm, targetLength); + result = strData; } - }; - } - return null; + } + + return result; + }; + } else if (outputType.getTypeRoot() == VARBINARY) { + final int targetLength = getBinaryLength(outputType); + return value -> { + byte[] byteArrayTerm = ((StringData) value).toBytes(); + if (byteArrayTerm.length <= targetLength) { + return byteArrayTerm; + } else { + return Arrays.copyOf(byteArrayTerm, targetLength); + } + }; } + return null; case BINARY: - { - if (outputType.getTypeRoot() == BINARY) { - final int targetLength = getBinaryLength(outputType); - return value -> - ((((byte[]) value).length == targetLength) - ? value - : Arrays.copyOf((byte[]) value, targetLength)); - } - return null; + if (outputType.getTypeRoot() == BINARY) { + final int targetLength = getBinaryLength(outputType); + return value -> + ((((byte[]) value).length == targetLength) + ? value + : Arrays.copyOf((byte[]) value, targetLength)); } + return null; case TIMESTAMP_WITHOUT_TIME_ZONE: - { - switch (outputType.getTypeRoot()) { - case DATE: - { - return value -> - (int) - (((TimestampData) value).getMillisecond() - / DateTimeUtils.MILLIS_PER_DAY); - } - case TIMESTAMP_WITHOUT_TIME_ZONE: - { - return value -> - DateTimeUtils.truncate( - (TimestampData) value, - ((TimestampType) outputType).getPrecision()); - } - case TIME_WITHOUT_TIME_ZONE: - { - return value -> - (int) - (((TimestampData) value).getMillisecond() - % DateTimeUtils.MILLIS_PER_DAY); - } - default: - { - return null; - } - } + switch (outputType.getTypeRoot()) { + case DATE: + { + return value -> + (int) + (((TimestampData) value).getMillisecond() + / DateTimeUtils.MILLIS_PER_DAY); + } + case TIMESTAMP_WITHOUT_TIME_ZONE: + { + return value -> + DateTimeUtils.truncate( + (TimestampData) value, + ((TimestampType) outputType).getPrecision()); + } + case TIME_WITHOUT_TIME_ZONE: + { + return value -> + (int) + (((TimestampData) value).getMillisecond() + % DateTimeUtils.MILLIS_PER_DAY); + } + default: + { + return null; + } } case TIME_WITHOUT_TIME_ZONE: - { - if (outputType.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE) { - return value -> - (int) - (((TimestampData) value).getMillisecond() - % DateTimeUtils.MILLIS_PER_DAY); - } - return null; + if (outputType.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE) { + return value -> + (int) + (((TimestampData) value).getMillisecond() + % DateTimeUtils.MILLIS_PER_DAY); } + return null; default: - { - return null; - } + return null; } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java index a5afc7d3..ea5ffeeb 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java @@ -43,124 +43,79 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link CastExecutor}. */ public class CastExecutorTest { + @Test public void testNumericToNumeric() { // byte to other numeric compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TinyIntType(false), new SmallIntType(false)), + CastExecutors.resolve(new TinyIntType(false), new SmallIntType(false)), (byte) 1, (short) 1); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TinyIntType(false), new IntType(false)), - (byte) 1, - 1); + CastExecutors.resolve(new TinyIntType(false), new IntType(false)), (byte) 1, 1); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TinyIntType(false), new BigIntType(false)), - (byte) 1, - 1L); + CastExecutors.resolve(new TinyIntType(false), new BigIntType(false)), (byte) 1, 1L); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TinyIntType(false), new FloatType(false)), - (byte) 1, - 1F); + CastExecutors.resolve(new TinyIntType(false), new FloatType(false)), (byte) 1, 1F); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TinyIntType(false), new DoubleType(false)), - (byte) 1, - 1D); + CastExecutors.resolve(new TinyIntType(false), new DoubleType(false)), (byte) 1, 1D); // short to other numeric compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new SmallIntType(false), new IntType(false)), - (short) 1, - 1); + CastExecutors.resolve(new SmallIntType(false), new IntType(false)), (short) 1, 1); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new SmallIntType(false), new BigIntType(false)), + CastExecutors.resolve(new SmallIntType(false), new BigIntType(false)), (short) 1, 1L); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new SmallIntType(false), new FloatType(false)), + CastExecutors.resolve(new SmallIntType(false), new FloatType(false)), (short) 1, 1F); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new SmallIntType(false), new DoubleType(false)), + CastExecutors.resolve(new SmallIntType(false), new DoubleType(false)), (short) 1, 1D); // int to other numeric - compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new IntType(false), new BigIntType(false)), - 1, - 1L); - compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new IntType(false), new FloatType(false)), - 1, - 1F); - compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new IntType(false), new DoubleType(false)), - 1, - 1D); + compareCastResult(CastExecutors.resolve(new IntType(false), new BigIntType(false)), 1, 1L); + compareCastResult(CastExecutors.resolve(new IntType(false), new FloatType(false)), 1, 1F); + compareCastResult(CastExecutors.resolve(new IntType(false), new DoubleType(false)), 1, 1D); // bigint to other numeric compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new BigIntType(false), new FloatType(false)), - 1L, - 1F); + CastExecutors.resolve(new BigIntType(false), new FloatType(false)), 1L, 1F); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new BigIntType(false), new DoubleType(false)), - 1L, - 1D); + CastExecutors.resolve(new BigIntType(false), new DoubleType(false)), 1L, 1D); // float to double compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new FloatType(false), new DoubleType(false)), - 1F, - 1D); + CastExecutors.resolve(new FloatType(false), new DoubleType(false)), 1F, 1D); } @Test public void testNumericToDecimal() { compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TinyIntType(false), new DecimalType(10, 2)), + CastExecutors.resolve(new TinyIntType(false), new DecimalType(10, 2)), (byte) 1, DecimalDataUtils.castFrom(1, 10, 2)); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new SmallIntType(false), new DecimalType(10, 2)), + CastExecutors.resolve(new SmallIntType(false), new DecimalType(10, 2)), (short) 1, DecimalDataUtils.castFrom(1, 10, 2)); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new IntType(false), new DecimalType(10, 2)), + CastExecutors.resolve(new IntType(false), new DecimalType(10, 2)), 1, DecimalDataUtils.castFrom(1, 10, 2)); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new BigIntType(false), new DecimalType(10, 2)), + CastExecutors.resolve(new BigIntType(false), new DecimalType(10, 2)), 1L, DecimalDataUtils.castFrom(1, 10, 2)); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new FloatType(false), new DecimalType(10, 2)), + CastExecutors.resolve(new FloatType(false), new DecimalType(10, 2)), 1.23456F, DecimalDataUtils.castFrom(1.23456D, 10, 2)); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new DoubleType(false), new DecimalType(10, 2)), + CastExecutors.resolve(new DoubleType(false), new DecimalType(10, 2)), 1.23456D, DecimalDataUtils.castFrom(1.23456D, 10, 2)); } @@ -168,13 +123,11 @@ public class CastExecutorTest { @Test public void testDecimalToDecimal() { compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new DecimalType(10, 4), new DecimalType(10, 2)), + CastExecutors.resolve(new DecimalType(10, 4), new DecimalType(10, 2)), DecimalDataUtils.castFrom(1.23456D, 10, 4), DecimalDataUtils.castFrom(1.23456D, 10, 2)); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new DecimalType(10, 2), new DecimalType(10, 4)), + CastExecutors.resolve(new DecimalType(10, 2), new DecimalType(10, 4)), DecimalDataUtils.castFrom(1.23456D, 10, 2), DecimalDataUtils.castFrom(1.2300D, 10, 4)); } @@ -182,13 +135,11 @@ public class CastExecutorTest { @Test public void testDecimalToNumeric() { compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new DecimalType(10, 4), new FloatType(false)), + CastExecutors.resolve(new DecimalType(10, 4), new FloatType(false)), DecimalDataUtils.castFrom(1.23456D, 10, 4), 1.2346F); compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new DecimalType(10, 2), new DoubleType(false)), + CastExecutors.resolve(new DecimalType(10, 2), new DoubleType(false)), DecimalDataUtils.castFrom(1.23456D, 10, 2), 1.23D); } @@ -197,57 +148,49 @@ public class CastExecutorTest { public void testStringToString() { // varchar(10) to varchar(5) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new VarCharType(10), new VarCharType(5)), + CastExecutors.resolve(new VarCharType(10), new VarCharType(5)), StringData.fromString("1234567890"), StringData.fromString("12345")); // varchar(10) to varchar(20) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new VarCharType(10), new VarCharType(20)), + CastExecutors.resolve(new VarCharType(10), new VarCharType(20)), StringData.fromString("1234567890"), StringData.fromString("1234567890")); // varchar(10) to char(5) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new VarCharType(10), new CharType(5)), + CastExecutors.resolve(new VarCharType(10), new CharType(5)), StringData.fromString("1234567890"), StringData.fromString("12345")); // varchar(10) to char(20) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new VarCharType(10), new CharType(20)), + CastExecutors.resolve(new VarCharType(10), new CharType(20)), StringData.fromString("1234567890"), StringData.fromString("1234567890 ")); // char(10) to varchar(5) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new CharType(10), new VarCharType(5)), + CastExecutors.resolve(new CharType(10), new VarCharType(5)), StringData.fromString("1234567890"), StringData.fromString("12345")); // char(10) to varchar(20) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new CharType(10), new VarCharType(20)), + CastExecutors.resolve(new CharType(10), new VarCharType(20)), StringData.fromString("12345678 "), StringData.fromString("12345678 ")); // char(10) to char(5) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new CharType(10), new CharType(5)), + CastExecutors.resolve(new CharType(10), new CharType(5)), StringData.fromString("12345678 "), StringData.fromString("12345")); // char(10) to char(20) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new CharType(10), new CharType(20)), + CastExecutors.resolve(new CharType(10), new CharType(20)), StringData.fromString("12345678 "), StringData.fromString("12345678 ")); } @@ -256,15 +199,13 @@ public class CastExecutorTest { public void testStringToBinary() { // string(10) to binary(5) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new VarCharType(10), new VarBinaryType(5)), + CastExecutors.resolve(new VarCharType(10), new VarBinaryType(5)), StringData.fromString("12345678"), "12345".getBytes()); // string(10) to binary(20) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new VarCharType(10), new VarBinaryType(20)), + CastExecutors.resolve(new VarCharType(10), new VarBinaryType(20)), StringData.fromString("12345678"), "12345678".getBytes()); } @@ -273,15 +214,13 @@ public class CastExecutorTest { public void testBinaryToBinary() { // binary(10) to binary(5) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new BinaryType(10), new BinaryType(5)), + CastExecutors.resolve(new BinaryType(10), new BinaryType(5)), "1234567890".getBytes(), "12345".getBytes()); // binary(10) to binary(20) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new BinaryType(10), new BinaryType(20)), + CastExecutors.resolve(new BinaryType(10), new BinaryType(20)), "12345678".getBytes(), new byte[] {49, 50, 51, 52, 53, 54, 55, 56, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}); } @@ -293,27 +232,25 @@ public class CastExecutorTest { // timestamp(5) to timestamp(2) compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TimestampType(5), new TimestampType(2)), + CastExecutors.resolve(new TimestampType(5), new TimestampType(2)), timestampData, DateTimeUtils.truncate(TimestampData.fromEpochMillis(mills), 2)); // timestamp to date compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TimestampType(5), new DateType()), + CastExecutors.resolve(new TimestampType(5), new DateType()), TimestampData.fromEpochMillis(mills), (int) (mills / DateTimeUtils.MILLIS_PER_DAY)); // timestamp to time compareCastResult( - (CastExecutor<Object, Object>) - CastExecutors.resolve(new TimestampType(5), new TimeType(2)), + CastExecutors.resolve(new TimestampType(5), new TimeType(2)), TimestampData.fromEpochMillis(mills), (int) (mills % DateTimeUtils.MILLIS_PER_DAY)); } - private void compareCastResult(CastExecutor<Object, Object> cast, Object input, Object output) { - assertThat(cast.cast(input)).isEqualTo(output); + @SuppressWarnings("rawtypes") + private void compareCastResult(CastExecutor<?, ?> cast, Object input, Object output) { + assertThat(((CastExecutor) cast).cast(input)).isEqualTo(output); } }