This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0426d8c7af0191f75e6aaa4696b3358de059dc67 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Mon Nov 15 13:40:34 2021 +0100 [FLINK-24781][table-planner] Add string parsing methods to BinaryStringDataUtil and add from string cast rules Signed-off-by: slinkydeveloper <francescogu...@gmail.com> --- .../apache/flink/table/utils/DateTimeUtils.java | 5 +- .../AbstractExpressionCodeGeneratorCastRule.java | 9 +- .../functions/casting/CastRuleProvider.java | 8 + .../CodeGeneratedExpressionCastExecutor.java | 1 - .../functions/casting/StringToBinaryCastRule.java | 50 +++++ .../functions/casting/StringToBooleanCastRule.java | 55 ++++++ .../functions/casting/StringToDateCastRule.java | 55 ++++++ .../functions/casting/StringToDecimalCastRule.java | 63 ++++++ .../casting/StringToNumericPrimitiveCastRule.java | 78 ++++++++ .../functions/casting/StringToTimeCastRule.java | 58 ++++++ .../casting/StringToTimestampCastRule.java | 63 ++++++ .../planner/codegen/calls/BuiltInMethods.scala | 68 +++++-- .../planner/codegen/calls/ScalarOperatorGens.scala | 105 ---------- .../planner/functions/casting/CastRulesTest.java | 211 ++++++++++++++++++++- .../table/data/binary/BinaryStringDataUtil.java | 169 ++++++++++------- .../flink/table/data/BinaryStringDataTest.java | 41 ++-- 16 files changed, 826 insertions(+), 213 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java index fdd715b..06fc883 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java @@ -317,6 +317,7 @@ public class DateTimeUtils { // -------------------------------------------------------------------------------------------- // String --> Timestamp conversion // -------------------------------------------------------------------------------------------- + public static TimestampData toTimestampData(String dateStr) { int length = dateStr.length(); String format; @@ -411,7 +412,7 @@ public class DateTimeUtils { * @param format date time string format * @param tz the time zone */ - public static Long toTimestamp(String dateStr, String format, TimeZone tz) { + private static Long toTimestamp(String dateStr, String format, TimeZone tz) { SimpleDateFormat formatter = FORMATTER_CACHE.get(format); formatter.setTimeZone(tz); try { @@ -1717,7 +1718,7 @@ public class DateTimeUtils { return timeStringToUnixDate(v, 0); } - public static Integer timeStringToUnixDate(String v, int start) { + private static Integer timeStringToUnixDate(String v, int start) { final int colon1 = v.indexOf(':', start); // timezone hh:mm:ss[.ssssss][[+|-]hh:mm:ss] // refer https://www.w3.org/TR/NOTE-datetime diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java index aa0a50b..6700a7e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java @@ -73,7 +73,14 @@ abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT> box( generateExpression( createCodeGeneratorCastRuleContext(context), - unbox(inputArgumentName, inputLogicalType), + unbox( + // We need the casting because the rules uses the + // concrete classes (e.g. StringData and + // BinaryStringData) + cast( + boxedTypeTermForType(inputLogicalType), + inputArgumentName), + inputLogicalType), inputLogicalType, targetLogicalType), targetLogicalType)); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java index b098191..cb5dfa2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java @@ -58,6 +58,14 @@ public class CastRuleProvider { .addRule(MapAndMultisetToStringCastRule.INSTANCE) .addRule(RowToStringCastRule.INSTANCE) .addRule(RawToStringCastRule.INSTANCE) + // From string rules + .addRule(StringToBooleanCastRule.INSTANCE) + .addRule(StringToDecimalCastRule.INSTANCE) + .addRule(StringToNumericPrimitiveCastRule.INSTANCE) + .addRule(StringToDateCastRule.INSTANCE) + .addRule(StringToTimeCastRule.INSTANCE) + .addRule(StringToTimestampCastRule.INSTANCE) + .addRule(StringToBinaryCastRule.INSTANCE) // Collection rules .addRule(ArrayToArrayCastRule.INSTANCE) // Special rules diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java index f39089a..7c361ac 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java @@ -21,7 +21,6 @@ package org.apache.flink.table.planner.functions.casting; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.utils.CastExecutor; -import org.apache.flink.util.FlinkRuntimeException; import org.codehaus.janino.ExpressionEvaluator; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java new file mode 100644 index 0000000..2a70de4 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall; + +/** + * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#BINARY_STRING} cast rule. + */ +class StringToBinaryCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, byte[]> { + + static final StringToBinaryCastRule INSTANCE = new StringToBinaryCastRule(); + + private StringToBinaryCastRule() { + super( + CastRulePredicate.builder() + .input(LogicalTypeFamily.CHARACTER_STRING) + .target(LogicalTypeFamily.BINARY_STRING) + .build()); + } + + @Override + public String generateExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + return methodCall(inputTerm, "toBytes"); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java new file mode 100644 index 0000000..b8730eb --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_BOOLEAN; +import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall; + +/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#BOOLEAN} cast rule. */ +class StringToBooleanCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Boolean> { + + static final StringToBooleanCastRule INSTANCE = new StringToBooleanCastRule(); + + private StringToBooleanCastRule() { + super( + CastRulePredicate.builder() + .input(LogicalTypeFamily.CHARACTER_STRING) + .target(LogicalTypeRoot.BOOLEAN) + .build()); + } + + @Override + public String generateExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + return staticCall(STRING_DATA_TO_BOOLEAN(), inputTerm); + } + + @Override + public boolean canFail() { + return true; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java new file mode 100644 index 0000000..2a31f22 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DATE; +import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall; + +/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#DATE} cast rule. */ +class StringToDateCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Integer> { + + static final StringToDateCastRule INSTANCE = new StringToDateCastRule(); + + private StringToDateCastRule() { + super( + CastRulePredicate.builder() + .input(LogicalTypeFamily.CHARACTER_STRING) + .target(LogicalTypeRoot.DATE) + .build()); + } + + @Override + public String generateExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + return staticCall(STRING_DATA_TO_DATE(), inputTerm); + } + + @Override + public boolean canFail() { + return true; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java new file mode 100644 index 0000000..77108d8 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DECIMAL; +import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall; + +/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#DECIMAL} cast rule. */ +class StringToDecimalCastRule + extends AbstractExpressionCodeGeneratorCastRule<StringData, DecimalData> { + + static final StringToDecimalCastRule INSTANCE = new StringToDecimalCastRule(); + + private StringToDecimalCastRule() { + super( + CastRulePredicate.builder() + .input(LogicalTypeFamily.CHARACTER_STRING) + .target(LogicalTypeRoot.DECIMAL) + .build()); + } + + @Override + public String generateExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + final DecimalType targetDecimalType = (DecimalType) targetLogicalType; + return staticCall( + STRING_DATA_TO_DECIMAL(), + inputTerm, + targetDecimalType.getPrecision(), + targetDecimalType.getScale()); + } + + @Override + public boolean canFail() { + return true; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java new file mode 100644 index 0000000..cec9ca8 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_BYTE; +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DOUBLE; +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_FLOAT; +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_INT; +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_LONG; +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_SHORT; +import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall; + +/** + * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#INTEGER_NUMERIC} and + * {@link LogicalTypeFamily#APPROXIMATE_NUMERIC} cast rule. + */ +class StringToNumericPrimitiveCastRule + extends AbstractExpressionCodeGeneratorCastRule<StringData, Number> { + + static final StringToNumericPrimitiveCastRule INSTANCE = new StringToNumericPrimitiveCastRule(); + + private StringToNumericPrimitiveCastRule() { + super( + CastRulePredicate.builder() + .input(LogicalTypeFamily.CHARACTER_STRING) + .target(LogicalTypeFamily.INTEGER_NUMERIC) + .target(LogicalTypeFamily.APPROXIMATE_NUMERIC) + .build()); + } + + @Override + public String generateExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + switch (targetLogicalType.getTypeRoot()) { + case TINYINT: + return staticCall(STRING_DATA_TO_BYTE(), inputTerm); + case SMALLINT: + return staticCall(STRING_DATA_TO_SHORT(), inputTerm); + case INTEGER: + return staticCall(STRING_DATA_TO_INT(), inputTerm); + case BIGINT: + return staticCall(STRING_DATA_TO_LONG(), inputTerm); + case FLOAT: + return staticCall(STRING_DATA_TO_FLOAT(), inputTerm); + case DOUBLE: + return staticCall(STRING_DATA_TO_DOUBLE(), inputTerm); + } + throw new IllegalArgumentException("This is a bug. Please file an issue."); + } + + @Override + public boolean canFail() { + return true; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java new file mode 100644 index 0000000..4c14b78 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIME; +import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall; + +/** + * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} cast + * rule. + */ +class StringToTimeCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Integer> { + + static final StringToTimeCastRule INSTANCE = new StringToTimeCastRule(); + + private StringToTimeCastRule() { + super( + CastRulePredicate.builder() + .input(LogicalTypeFamily.CHARACTER_STRING) + .target(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE) + .build()); + } + + @Override + public String generateExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + return staticCall(STRING_DATA_TO_TIME(), inputTerm); + } + + @Override + public boolean canFail() { + return true; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java new file mode 100644 index 0000000..96e40b3 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIMESTAMP; +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIMESTAMP_WITH_ZONE; +import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall; + +/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#TIMESTAMP} cast rule. */ +class StringToTimestampCastRule + extends AbstractExpressionCodeGeneratorCastRule<StringData, TimestampData> { + + static final StringToTimestampCastRule INSTANCE = new StringToTimestampCastRule(); + + private StringToTimestampCastRule() { + super( + CastRulePredicate.builder() + .input(LogicalTypeFamily.CHARACTER_STRING) + .target(LogicalTypeFamily.TIMESTAMP) + .build()); + } + + @Override + public String generateExpression( + CodeGeneratorCastRule.Context context, + String inputTerm, + LogicalType inputLogicalType, + LogicalType targetLogicalType) { + if (targetLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { + return staticCall(STRING_DATA_TO_TIMESTAMP(), inputTerm); + } + + return staticCall( + STRING_DATA_TO_TIMESTAMP_WITH_ZONE(), inputTerm, context.getSessionTimeZoneTerm()); + } + + @Override + public boolean canFail() { + return true; + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala index 15132b0..308826d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala @@ -22,14 +22,14 @@ import org.apache.flink.table.data.{DecimalData, DecimalDataUtils, TimestampData import org.apache.flink.table.runtime.functions._ import org.apache.flink.table.utils.DateTimeUtils import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange - import org.apache.calcite.linq4j.tree.Types import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions} import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior} -import org.apache.flink.table.data.binary.BinaryStringData +import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUtil} import java.lang.reflect.Method import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => JShort} +import java.time.ZoneId import java.util.TimeZone object BuiltInMethods { @@ -402,16 +402,6 @@ object BuiltInMethods { "toTimestampData", classOf[String], classOf[String]) - val STRING_TO_TIMESTAMP_TIME_ZONE = Types.lookupMethod( - classOf[DateTimeUtils], - "toTimestamp", - classOf[String], classOf[TimeZone]) - - val STRING_TO_TIMESTAMP_WITH_FORMAT_TIME_ZONE = Types.lookupMethod( - classOf[DateTimeUtils], - "toTimestamp", - classOf[String], classOf[String], classOf[TimeZone]) - val TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_DATE = Types.lookupMethod( classOf[DateTimeUtils], "timestampWithLocalZoneToDate", @@ -547,6 +537,60 @@ object BuiltInMethods { val BINARY_STRING_DATA_FROM_STRING = Types.lookupMethod(classOf[BinaryStringData], "fromString", classOf[String]) + val STRING_DATA_TO_BOOLEAN = Types.lookupMethod( + classOf[BinaryStringDataUtil], + "toBoolean", + classOf[BinaryStringData]) + + val STRING_DATA_TO_DECIMAL = Types.lookupMethod( + classOf[BinaryStringDataUtil], + "toDecimal", + classOf[BinaryStringData], + classOf[Int], + classOf[Int]) + + val STRING_DATA_TO_LONG = Types.lookupMethod( + classOf[BinaryStringDataUtil], + "toLong", + classOf[BinaryStringData]) + + val STRING_DATA_TO_INT = Types.lookupMethod( + classOf[BinaryStringDataUtil], + "toInt", + classOf[BinaryStringData]) + + val STRING_DATA_TO_SHORT = Types.lookupMethod( + classOf[BinaryStringDataUtil], + "toShort", + classOf[BinaryStringData]) + + val STRING_DATA_TO_BYTE = Types.lookupMethod( + classOf[BinaryStringDataUtil], + "toByte", + classOf[BinaryStringData]) + + val STRING_DATA_TO_FLOAT = Types.lookupMethod( + classOf[BinaryStringDataUtil], + "toFloat", + classOf[BinaryStringData]) + + val STRING_DATA_TO_DOUBLE = Types.lookupMethod( + classOf[BinaryStringDataUtil], + "toDouble", + classOf[BinaryStringData]) + + val STRING_DATA_TO_DATE = Types.lookupMethod( + classOf[BinaryStringDataUtil], "toDate", classOf[BinaryStringData]) + + val STRING_DATA_TO_TIME = Types.lookupMethod( + classOf[BinaryStringDataUtil], "toTime", classOf[BinaryStringData]) + + val STRING_DATA_TO_TIMESTAMP = Types.lookupMethod( + classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData]) + + val STRING_DATA_TO_TIMESTAMP_WITH_ZONE = Types.lookupMethod( + classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData], classOf[TimeZone]) + // DecimalData functions val DECIMAL_TO_DECIMAL = Types.lookupMethod( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala index 0cb0bea..8554e4f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala @@ -1094,111 +1094,6 @@ object ScalarOperatorGens { operandTerm => s"$operandTerm.toBytes($serTerm)" } - // String -> Boolean - case (VARCHAR | CHAR, BOOLEAN) => - val castedExpression = generateUnaryOperatorIfNotNull( - ctx, - targetType, - operand, - resultNullable = true) { - operandTerm => s"$BINARY_STRING_UTIL.toBooleanSQL($operandTerm)" - } - val resultTerm = newName("primitiveCastResult") - castedExpression.copy( - resultTerm = resultTerm, - code = - s""" - |${castedExpression.code} - |boolean $resultTerm = Boolean.TRUE.equals(${castedExpression.resultTerm}); - |""".stripMargin - ) - - // String -> NUMERIC TYPE (not Character) - case (VARCHAR | CHAR, _) - if isNumeric(targetType) => - targetType match { - case dt: DecimalType => - generateUnaryOperatorIfNotNull(ctx, targetType, operand) { operandTerm => - s"$BINARY_STRING_UTIL.toDecimal($operandTerm, ${dt.getPrecision}, ${dt.getScale})" - } - case _ => - val methodName = targetType.getTypeRoot match { - case TINYINT => "toByte" - case SMALLINT => "toShort" - case INTEGER => "toInt" - case BIGINT => "toLong" - case DOUBLE => "toDouble" - case FLOAT => "toFloat" - case _ => null - } - assert(methodName != null, "Unexpected data type.") - generateUnaryOperatorIfNotNull( - ctx, - targetType, - operand, - resultNullable = true) { - operandTerm => s"($BINARY_STRING_UTIL.$methodName($operandTerm.trim()))" - } - } - - // String -> Date - case (VARCHAR | CHAR, DATE) => - generateUnaryOperatorIfNotNull( - ctx, - targetType, - operand, - resultNullable = true) { - operandTerm => - s"${qualifyMethod(BuiltInMethods.STRING_TO_DATE)}($operandTerm.toString())" - } - - // String -> Time - case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) => - generateUnaryOperatorIfNotNull( - ctx, - targetType, - operand, - resultNullable = true) { - operandTerm => - s"${qualifyMethod(BuiltInMethods.STRING_TO_TIME)}($operandTerm.toString())" - } - - // String -> Timestamp - case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) => - generateUnaryOperatorIfNotNull( - ctx, - targetType, - operand, - resultNullable = true) { - operandTerm => - s""" - |${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString()) - """.stripMargin - } - - case (VARCHAR | CHAR, TIMESTAMP_WITH_LOCAL_TIME_ZONE) => - generateCallWithStmtIfArgsNotNull( - ctx, targetType, Seq(operand), resultNullable = true) { operands => - val zone = ctx.addReusableSessionTimeZone() - val method = qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP_TIME_ZONE) - val toTimestampResultName = newName("toTimestampResult") - // this method call might return null - val stmt = s"Long $toTimestampResultName = $method(${operands.head}.toString(), $zone);" - val result = - s""" - |($toTimestampResultName == null ? - | null : - | $TIMESTAMP_DATA.fromEpochMillis($toTimestampResultName)) - |""".stripMargin - (stmt, result) - } - - // String -> binary - case (VARCHAR | CHAR, VARBINARY | BINARY) => - generateUnaryOperatorIfNotNull(ctx, targetType, operand) { - operandTerm => s"$operandTerm.toBytes()" - } - // Note: SQL2003 $6.12 - casting is not allowed between boolean and numeric types. // Calcite does not allow it either. diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java index 3652787..127ece8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.functions.casting; import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; @@ -80,6 +81,7 @@ import static org.apache.flink.table.api.DataTypes.TINYINT; import static org.apache.flink.table.api.DataTypes.VARBINARY; import static org.apache.flink.table.api.DataTypes.VARCHAR; import static org.apache.flink.table.api.DataTypes.YEAR; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -90,9 +92,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; */ class CastRulesTest { + private static final ZoneId CET = ZoneId.of("CET"); + private static final CastRule.Context CET_CONTEXT = - CastRule.Context.create( - ZoneId.of("CET"), Thread.currentThread().getContextClassLoader()); + CastRule.Context.create(CET, Thread.currentThread().getContextClassLoader()); private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5; private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5; @@ -125,6 +128,12 @@ class CastRulesTest { return Stream.of( CastTestSpecBuilder.testCastTo(TINYINT()) .fromCase(TINYINT(), null, null) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("Apache"), TableException.class) + .fromCase(STRING(), StringData.fromString("1.234"), (byte) 1) + .fromCase(STRING(), StringData.fromString("123"), (byte) 123) + .fail(STRING(), StringData.fromString("-130"), TableException.class) .fromCase( DECIMAL(4, 3), DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3), @@ -151,6 +160,12 @@ class CastRulesTest { .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (byte) -123), CastTestSpecBuilder.testCastTo(SMALLINT()) .fromCase(SMALLINT(), null, null) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("Apache"), TableException.class) + .fromCase(STRING(), StringData.fromString("1.234"), (short) 1) + .fromCase(STRING(), StringData.fromString("123"), (short) 123) + .fail(STRING(), StringData.fromString("-32769"), TableException.class) .fromCase( DECIMAL(4, 3), DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3), @@ -187,6 +202,15 @@ class CastRulesTest { .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (short) -123) .fromCase(DOUBLE(), 123456.7890d, (short) -7616), CastTestSpecBuilder.testCastTo(INT()) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("Apache"), TableException.class) + .fromCase(STRING(), StringData.fromString("1.234"), 1) + .fromCase(STRING(), StringData.fromString("123"), 123) + .fail( + STRING(), + StringData.fromString("-3276913443134"), + TableException.class) .fromCase( DECIMAL(4, 3), DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3), @@ -229,6 +253,13 @@ class CastRulesTest { .fromCase(INTERVAL(DAY(), SECOND()), 123L, 123), CastTestSpecBuilder.testCastTo(BIGINT()) .fromCase(BIGINT(), null, null) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("Apache"), TableException.class) + .fromCase(STRING(), StringData.fromString("1.234"), 1L) + .fromCase(STRING(), StringData.fromString("123"), 123L) + .fromCase( + STRING(), StringData.fromString("-3276913443134"), -3276913443134L) .fromCase( DECIMAL(4, 3), DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3), @@ -266,6 +297,13 @@ class CastRulesTest { .fromCase(DOUBLE(), 9234567891.12345d, 9234567891L), CastTestSpecBuilder.testCastTo(FLOAT()) .fromCase(FLOAT(), null, null) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("Apache"), TableException.class) + .fromCase(STRING(), StringData.fromString("1.234"), 1.234f) + .fromCase(STRING(), StringData.fromString("123"), 123.0f) + .fromCase( + STRING(), StringData.fromString("-3276913443134"), -3.27691351E12f) .fromCase( DECIMAL(4, 3), DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3), @@ -307,6 +345,15 @@ class CastRulesTest { .fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.23923451E12f), CastTestSpecBuilder.testCastTo(DOUBLE()) .fromCase(DOUBLE(), null, null) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("Apache"), TableException.class) + .fromCase(STRING(), StringData.fromString("1.234"), 1.234d) + .fromCase(STRING(), StringData.fromString("123"), 123.0d) + .fromCase( + STRING(), + StringData.fromString("-3276913443134"), + -3.276913443134E12d) .fromCase( DECIMAL(4, 3), DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3), @@ -349,6 +396,92 @@ class CastRulesTest { .fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, DEFAULT_POSITIVE_DOUBLE) .fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, DEFAULT_NEGATIVE_DOUBLE) .fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.2392345678911235E12d), + CastTestSpecBuilder.testCastTo(DATE()) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fromCase( + STRING(), + StringData.fromString("123"), + DateTimeUtils.localDateToUnixDate(LocalDate.of(123, 1, 1))) + .fromCase( + STRING(), + StringData.fromString("2021-09-27"), + DateTimeUtils.localDateToUnixDate(LocalDate.of(2021, 9, 27))) + .fromCase( + STRING(), + StringData.fromString("2021-09-27 12:34:56.123456789"), + DateTimeUtils.localDateToUnixDate(LocalDate.of(2021, 9, 27))) + .fail(STRING(), StringData.fromString("2021/09/27"), TableException.class), + CastTestSpecBuilder.testCastTo(TIME()) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fromCase( + STRING(), + StringData.fromString("23"), + DateTimeUtils.localTimeToUnixDate(LocalTime.of(23, 0, 0))) + .fromCase( + STRING(), + StringData.fromString("23:45"), + DateTimeUtils.localTimeToUnixDate(LocalTime.of(23, 45, 0))) + .fail(STRING(), StringData.fromString("2021-09-27"), TableException.class) + .fail( + STRING(), + StringData.fromString("2021-09-27 12:34:56"), + TableException.class) + .fromCase( + STRING(), + StringData.fromString("12:34:56.123456789"), + DateTimeUtils.localTimeToUnixDate( + LocalTime.of(12, 34, 56, 123_000_000))) + .fail( + STRING(), + StringData.fromString("2021-09-27 12:34:56.123456789"), + TableException.class), + CastTestSpecBuilder.testCastTo(TIMESTAMP(9)) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("123"), TableException.class) + .fromCase( + STRING(), + StringData.fromString("2021-09-27"), + TimestampData.fromLocalDateTime( + LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0))) + .fail(STRING(), StringData.fromString("2021/09/27"), TableException.class) + .fromCase( + STRING(), + StringData.fromString("2021-09-27 12:34:56.123456789"), + TimestampData.fromLocalDateTime( + LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123456789))), + CastTestSpecBuilder.testCastTo(TIMESTAMP_LTZ(9)) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("123"), TableException.class) + .fromCase( + STRING(), + CET_CONTEXT, + StringData.fromString("2021-09-27"), + TimestampData.fromInstant( + LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0) + .atZone(CET) + .toInstant())) + .fromCase( + STRING(), + CET_CONTEXT, + StringData.fromString("2021-09-27 12:34:56.123"), + TimestampData.fromInstant( + LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123000000) + .atZone(CET) + .toInstant())) + // https://issues.apache.org/jira/browse/FLINK-24446 Fractional seconds are + // lost + .fromCase( + STRING(), + CET_CONTEXT, + StringData.fromString("2021-09-27 12:34:56.123456789"), + TimestampData.fromInstant( + LocalDateTime.of(2021, 9, 27, 12, 34, 56, 0) + .atZone(CET) + .toInstant())), CastTestSpecBuilder.testCastTo(STRING()) .fromCase(STRING(), null, null) .fromCase( @@ -478,7 +611,60 @@ class CastRulesTest { RawValueData.fromObject( LocalDateTime.parse("2020-11-11T18:08:01.123")), StringData.fromString("2020-11-11T18:08:01.123")), + CastTestSpecBuilder.testCastTo(BOOLEAN()) + .fromCase(BOOLEAN(), null, null) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fromCase(CHAR(4), StringData.fromString("true"), true) + .fromCase(VARCHAR(5), StringData.fromString("FalsE"), false) + .fail(STRING(), StringData.fromString("Apache Flink"), TableException.class) + .fromCase(STRING(), StringData.fromString("TRUE"), true) + .fail(STRING(), StringData.fromString(""), TableException.class), + CastTestSpecBuilder.testCastTo(BINARY(2)) + .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111}) + .fromCase( + VARCHAR(5), + StringData.fromString("Flink"), + new byte[] {70, 108, 105, 110, 107}) + // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2 + // bytes + .fromCase( + STRING(), + StringData.fromString("Apache"), + new byte[] {65, 112, 97, 99, 104, 101}), + CastTestSpecBuilder.testCastTo(VARBINARY(4)) + .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111}) + .fromCase( + VARCHAR(5), + StringData.fromString("Flink"), + new byte[] {70, 108, 105, 110, 107}) + // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2 + // bytes + .fromCase( + STRING(), + StringData.fromString("Apache"), + new byte[] {65, 112, 97, 99, 104, 101}), + CastTestSpecBuilder.testCastTo(BYTES()) + .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111}) + .fromCase( + VARCHAR(5), + StringData.fromString("Flink"), + new byte[] {70, 108, 105, 110, 107}) + .fromCase( + STRING(), + StringData.fromString("Apache"), + new byte[] {65, 112, 97, 99, 104, 101}), CastTestSpecBuilder.testCastTo(DECIMAL(5, 3)) + .fail(CHAR(3), StringData.fromString("foo"), TableException.class) + .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class) + .fail(STRING(), StringData.fromString("Apache"), TableException.class) + .fromCase( + STRING(), + StringData.fromString("1.234"), + DecimalData.fromBigDecimal(new BigDecimal("1.234"), 5, 3)) + .fromCase( + STRING(), + StringData.fromString("1.2"), + DecimalData.fromBigDecimal(new BigDecimal("1.200"), 5, 3)) .fromCase( DECIMAL(4, 3), DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3), @@ -641,12 +827,21 @@ class CastRulesTest { this.inputTypes.add(dataType); this.assertionExecutors.add( executor -> { - assertEquals(target, executor.cast(src)); - // Run twice to make sure rules are reusable without causing issues - assertEquals( - target, - executor.cast(src), - "Error when reusing the rule. Perhaps there is some state that needs to be reset"); + if (target instanceof byte[]) { + assertArrayEquals((byte[]) target, (byte[]) executor.cast(src)); + // Run twice to make sure rules are reusable without causing issues + assertArrayEquals( + (byte[]) target, + (byte[]) executor.cast(src), + "Error when reusing the rule. Perhaps there is some state that needs to be reset"); + } else { + assertEquals(target, executor.cast(src)); + // Run twice to make sure rules are reusable without causing issues + assertEquals( + target, + executor.cast(src), + "Error when reusing the rule. Perhaps there is some state that needs to be reset"); + } }); this.descriptions.add("{" + src + " => " + target + "}"); this.castContexts.add(castContext); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java index 2aebca5..e534337 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java @@ -18,18 +18,24 @@ package org.apache.flink.table.data.binary; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.DecimalDataUtils; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.runtime.util.SegmentsUtil; import org.apache.flink.table.runtime.util.StringUtf8Utils; +import org.apache.flink.table.utils.DateTimeUtils; import org.apache.flink.table.utils.EncodingUtils; import java.math.BigDecimal; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.time.DateTimeException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.TimeZone; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -144,12 +150,16 @@ public class BinaryStringDataUtil { return substrings.toArray(new BinaryStringData[0]); } - /** Decide boolean representation of a string. */ - public static Boolean toBooleanSQL(BinaryStringData str) { + /** Parse a {@link StringData} to boolean. */ + public static boolean toBoolean(BinaryStringData str) throws TableException { BinaryStringData lowerCase = str.toLowerCase(); - return TRUE_STRINGS.contains(lowerCase) - ? Boolean.TRUE - : (FALSE_STRINGS.contains(lowerCase) ? Boolean.FALSE : null); + if (TRUE_STRINGS.contains(lowerCase)) { + return true; + } + if (FALSE_STRINGS.contains(lowerCase)) { + return false; + } + throw new TableException("Cannot parse '" + str + "' as BOOLEAN."); } /** Calculate the hash value of the given bytes use {@link MessageDigest}. */ @@ -168,21 +178,30 @@ public class BinaryStringDataUtil { } /** - * Parses this BinaryStringData to DecimalData. + * Parses a {@link BinaryStringData} to {@link DecimalData}. * - * @return DecimalData value if the parsing was successful, or null if overflow - * @throws NumberFormatException if the parsing failed. + * @return DecimalData value if the parsing was successful. */ - public static DecimalData toDecimal(BinaryStringData str, int precision, int scale) { + public static DecimalData toDecimal(BinaryStringData str, int precision, int scale) + throws NumberFormatException { str.ensureMaterialized(); + DecimalData data; + if (DecimalDataUtils.isByteArrayDecimal(precision) || DecimalDataUtils.isByteArrayDecimal(str.getSizeInBytes())) { - return toBigPrecisionDecimal(str, precision, scale); + data = toBigPrecisionDecimal(str, precision, scale); + } else { + int sizeInBytes = str.getSizeInBytes(); + data = + toDecimalFromBytes( + precision, scale, getTmpBytes(str, sizeInBytes), 0, sizeInBytes); } - int sizeInBytes = str.getSizeInBytes(); - return toDecimalFromBytes(precision, scale, getTmpBytes(str, sizeInBytes), 0, sizeInBytes); + if (data == null) { + throw numberFormatExceptionFor(str, "Overflow."); + } + return data; } private static DecimalData toDecimalFromBytes( @@ -353,12 +372,9 @@ public class BinaryStringDataUtil { break; } } - try { - BigDecimal bd = new BigDecimal(chars, start, end - start); - return DecimalData.fromBigDecimal(bd, precision, scale); - } catch (NumberFormatException nfe) { - return null; - } + + BigDecimal bd = new BigDecimal(chars, start, end - start); + return DecimalData.fromBigDecimal(bd, precision, scale); } } @@ -371,14 +387,12 @@ public class BinaryStringDataUtil { * Long.MIN_VALUE is '-9223372036854775808'. * * <p>This code is mostly copied from LazyLong.parseLong in Hive. - * - * @return Long value if the parsing was successful else null. */ - public static Long toLong(BinaryStringData str) { + public static long toLong(BinaryStringData str) throws NumberFormatException { int sizeInBytes = str.getSizeInBytes(); byte[] tmpBytes = getTmpBytes(str, sizeInBytes); if (sizeInBytes == 0) { - return null; + throw numberFormatExceptionFor(str, "Input is empty."); } int i = 0; @@ -387,7 +401,7 @@ public class BinaryStringDataUtil { if (negative || b == '+') { i++; if (sizeInBytes == 1) { - return null; + throw numberFormatExceptionFor(str, "Input has only positive or negative symbol."); } } @@ -409,7 +423,7 @@ public class BinaryStringDataUtil { if (b >= '0' && b <= '9') { digit = b - '0'; } else { - return null; + throw numberFormatExceptionFor(str, "Invalid character found."); } // We are going to process the new digit and accumulate the result. However, before @@ -417,7 +431,7 @@ public class BinaryStringDataUtil { // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller // than minValue, and we can stop. if (result < stopValue) { - return null; + throw numberFormatExceptionFor(str, "Overflow."); } result = result * radix - digit; @@ -425,7 +439,7 @@ public class BinaryStringDataUtil { // stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow. // If result overflows, we should stop. if (result > 0) { - return null; + throw numberFormatExceptionFor(str, "Overflow."); } } @@ -435,7 +449,7 @@ public class BinaryStringDataUtil { while (i < sizeInBytes) { byte currentByte = tmpBytes[i]; if (currentByte < '0' || currentByte > '9') { - return null; + throw numberFormatExceptionFor(str, "Invalid character found."); } i++; } @@ -443,7 +457,7 @@ public class BinaryStringDataUtil { if (!negative) { result = -result; if (result < 0) { - return null; + throw numberFormatExceptionFor(str, "Overflow."); } } return result; @@ -461,14 +475,12 @@ public class BinaryStringDataUtil { * * <p>Note that, this method is almost same as `toLong`, but we leave it duplicated for * performance reasons, like Hive does. - * - * @return Integer value if the parsing was successful else null. */ - public static Integer toInt(BinaryStringData str) { + public static int toInt(BinaryStringData str) throws NumberFormatException { int sizeInBytes = str.getSizeInBytes(); byte[] tmpBytes = getTmpBytes(str, sizeInBytes); if (sizeInBytes == 0) { - return null; + throw numberFormatExceptionFor(str, "Input is empty."); } int i = 0; @@ -477,7 +489,7 @@ public class BinaryStringDataUtil { if (negative || b == '+') { i++; if (sizeInBytes == 1) { - return null; + throw numberFormatExceptionFor(str, "Input has only positive or negative symbol."); } } @@ -499,7 +511,7 @@ public class BinaryStringDataUtil { if (b >= '0' && b <= '9') { digit = b - '0'; } else { - return null; + throw numberFormatExceptionFor(str, "Invalid character found."); } // We are going to process the new digit and accumulate the result. However, before @@ -507,7 +519,7 @@ public class BinaryStringDataUtil { // stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller // than minValue, and we can stop. if (result < stopValue) { - return null; + throw numberFormatExceptionFor(str, "Overflow."); } result = result * radix - digit; @@ -515,7 +527,7 @@ public class BinaryStringDataUtil { // stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow. // If result overflows, we should stop. if (result > 0) { - return null; + throw numberFormatExceptionFor(str, "Overflow."); } } @@ -525,7 +537,7 @@ public class BinaryStringDataUtil { while (i < sizeInBytes) { byte currentByte = tmpBytes[i]; if (currentByte < '0' || currentByte > '9') { - return null; + throw numberFormatExceptionFor(str, "Invalid character found."); } i++; } @@ -533,48 +545,77 @@ public class BinaryStringDataUtil { if (!negative) { result = -result; if (result < 0) { - return null; + throw numberFormatExceptionFor(str, "Overflow."); } } return result; } - public static Short toShort(BinaryStringData str) { - Integer intValue = toInt(str); - if (intValue != null) { - short result = intValue.shortValue(); - if (result == intValue) { - return result; - } + public static short toShort(BinaryStringData str) throws NumberFormatException { + int intValue = toInt(str); + short result = (short) intValue; + if (result == intValue) { + return result; } - return null; + throw numberFormatExceptionFor(str, "Overflow."); } - public static Byte toByte(BinaryStringData str) { - Integer intValue = toInt(str); - if (intValue != null) { - byte result = intValue.byteValue(); - if (result == intValue) { - return result; - } + public static byte toByte(BinaryStringData str) throws NumberFormatException { + int intValue = toInt(str); + byte result = (byte) intValue; + if (result == intValue) { + return result; } - return null; + throw numberFormatExceptionFor(str, "Overflow."); } - public static Double toDouble(BinaryStringData str) { - try { - return Double.valueOf(str.toString()); - } catch (NumberFormatException e) { - return null; + public static double toDouble(BinaryStringData str) throws NumberFormatException { + return Double.parseDouble(str.toString()); + } + + public static float toFloat(BinaryStringData str) throws NumberFormatException { + return Float.parseFloat(str.toString()); + } + + private static NumberFormatException numberFormatExceptionFor(StringData input, String reason) { + return new NumberFormatException("For input string: '" + input + "'. " + reason); + } + + public static int toDate(BinaryStringData input) throws DateTimeException { + Integer date = DateTimeUtils.dateStringToUnixDate(input.toString()); + if (date == null) { + throw new DateTimeException("For input string: '" + input + "'."); } + + return date; } - public static Float toFloat(BinaryStringData str) { - try { - return Float.valueOf(str.toString()); - } catch (NumberFormatException e) { - return null; + public static int toTime(BinaryStringData input) throws DateTimeException { + Integer date = DateTimeUtils.timeStringToUnixDate(input.toString()); + if (date == null) { + throw new DateTimeException("For input string: '" + input + "'."); + } + + return date; + } + + public static TimestampData toTimestamp(BinaryStringData input) throws DateTimeException { + TimestampData timestamp = DateTimeUtils.toTimestampData(input.toString()); + if (timestamp == null) { + throw new DateTimeException("For input string: '" + input + "'."); } + + return timestamp; + } + + public static TimestampData toTimestamp(BinaryStringData input, TimeZone timeZone) + throws DateTimeException { + Long timestamp = DateTimeUtils.toTimestamp(input.toString(), timeZone); + if (timestamp == null) { + throw new DateTimeException("For input string: '" + input + "'."); + } + + return TimestampData.fromEpochMillis(timestamp); } /** diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java index 4a8741f..bfb979d 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java @@ -60,6 +60,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; /** @@ -420,29 +421,29 @@ public class BinaryStringDataTest { @Test public void testToNumeric() { // Test to integer. - assertEquals(Byte.valueOf("123"), toByte(fromString("123"))); - assertEquals(Byte.valueOf("123"), toByte(fromString("+123"))); - assertEquals(Byte.valueOf("-123"), toByte(fromString("-123"))); + assertEquals(Byte.parseByte("123"), toByte(fromString("123"))); + assertEquals(Byte.parseByte("123"), toByte(fromString("+123"))); + assertEquals(Byte.parseByte("-123"), toByte(fromString("-123"))); - assertEquals(Short.valueOf("123"), toShort(fromString("123"))); - assertEquals(Short.valueOf("123"), toShort(fromString("+123"))); - assertEquals(Short.valueOf("-123"), toShort(fromString("-123"))); + assertEquals(Short.parseShort("123"), toShort(fromString("123"))); + assertEquals(Short.parseShort("123"), toShort(fromString("+123"))); + assertEquals(Short.parseShort("-123"), toShort(fromString("-123"))); - assertEquals(Integer.valueOf("123"), toInt(fromString("123"))); - assertEquals(Integer.valueOf("123"), toInt(fromString("+123"))); - assertEquals(Integer.valueOf("-123"), toInt(fromString("-123"))); + assertEquals(Integer.parseInt("123"), toInt(fromString("123"))); + assertEquals(Integer.parseInt("123"), toInt(fromString("+123"))); + assertEquals(Integer.parseInt("-123"), toInt(fromString("-123"))); - assertEquals(Long.valueOf("1234567890"), toLong(fromString("1234567890"))); - assertEquals(Long.valueOf("+1234567890"), toLong(fromString("+1234567890"))); - assertEquals(Long.valueOf("-1234567890"), toLong(fromString("-1234567890"))); + assertEquals(Long.parseLong("1234567890"), toLong(fromString("1234567890"))); + assertEquals(Long.parseLong("+1234567890"), toLong(fromString("+1234567890"))); + assertEquals(Long.parseLong("-1234567890"), toLong(fromString("-1234567890"))); // Test decimal string to integer. - assertEquals(Integer.valueOf("123"), toInt(fromString("123.456789"))); - assertEquals(Long.valueOf("123"), toLong(fromString("123.456789"))); + assertEquals(Integer.parseInt("123"), toInt(fromString("123.456789"))); + assertEquals(Long.parseLong("123"), toLong(fromString("123.456789"))); // Test negative cases. - assertNull(toInt(fromString("1a3.456789"))); - assertNull(toInt(fromString("123.a56789"))); + assertThrows(NumberFormatException.class, () -> toInt(fromString("1a3.456789"))); + assertThrows(NumberFormatException.class, () -> toInt(fromString("123.a56789"))); // Test composite in BinaryRowData. BinaryRowData row = new BinaryRowData(20); @@ -453,10 +454,10 @@ public class BinaryStringDataTest { writer.writeString(3, BinaryStringData.fromString("123456789")); writer.complete(); - assertEquals(Byte.valueOf("1"), toByte(((BinaryStringData) row.getString(0)))); - assertEquals(Short.valueOf("123"), toShort(((BinaryStringData) row.getString(1)))); - assertEquals(Integer.valueOf("12345"), toInt(((BinaryStringData) row.getString(2)))); - assertEquals(Long.valueOf("123456789"), toLong(((BinaryStringData) row.getString(3)))); + assertEquals(Byte.parseByte("1"), toByte(((BinaryStringData) row.getString(0)))); + assertEquals(Short.parseShort("123"), toShort(((BinaryStringData) row.getString(1)))); + assertEquals(Integer.parseInt("12345"), toInt(((BinaryStringData) row.getString(2)))); + assertEquals(Long.parseLong("123456789"), toLong(((BinaryStringData) row.getString(3)))); } @Test