This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 637d08e97f4 [FLINK-27089][table-planner] Fix bug with TRY_CAST in batch mode 637d08e97f4 is described below commit 637d08e97f40f7a4d2836fcd0e697eab8b2fe324 Author: Marios Trivyzas <mat...@gmail.com> AuthorDate: Wed Apr 6 17:13:35 2022 +0300 [FLINK-27089][table-planner] Fix bug with TRY_CAST in batch mode When in batch mode, the `getMonotonicity()` method of a function is called, to determine possible optimisations. For `TRY_CAST` the implementation needs to call `getOperandType(1)` to get the target (also the function's return) type of the cast. This fails as for `CAST` and `TRY_CAST` at this point we have only one operand. `CAST` solves this in Calcite code, more specifically in `RexCallBinding#create()` where in case the `kind` of the function is `CAST`, a special `RexCastCallBinding` instance is created which stores the return (target) type and returns it when `getOperandType(1)` is called. For `TRY_CAST` we don't have access to the stack to do something similar, and we cannot set the kind of `TRY_CAST` to `CAST` (currently, it's `OTHER_FUNCTION`, as this will allow the calcite stack to apply rules and optimisations to the `TRY_CAST` call and at some point convert it to a regular `CAST` call, thus breaking the functionality of `TRY_CAST` (return null instead of failing). As a solution to the problem, we simply don't implement the `getMonotonicity()` method for `TRY_CAST`, lossing possible optmisations. This closes #19379. --- .../table/planner/functions/sql/SqlTryCastFunction.java | 8 +------- .../table/planner/expressions/ScalarOperatorsTest.scala | 13 +++++++++++++ .../flink/table/planner/runtime/batch/sql/CalcITCase.scala | 8 ++++++++ 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTryCastFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTryCastFunction.java index fffb5fe5b06..5c307e606f8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTryCastFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTryCastFunction.java @@ -33,8 +33,6 @@ import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.fun.SqlCastFunction; import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION; - /** * This class implements the {@code TRY_CAST} built-in, essentially delegating all the method * invocations, whenever is possible, to Calcite's {@link SqlCastFunction}. @@ -49,16 +47,12 @@ public class SqlTryCastFunction extends BuiltInSqlFunction { SqlTryCastFunction() { super( "TRY_CAST", - DEFAULT_VERSION, SqlKind.OTHER_FUNCTION, null, SqlStdOperatorTable.CAST .getOperandTypeInference(), // From Calcite's SqlCastFunction null, - SqlFunctionCategory.SYSTEM, - true, - false, - SqlStdOperatorTable.CAST::getMonotonicity); + SqlFunctionCategory.SYSTEM); } @Override diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala index 433660c080e..2f0f869f6cf 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.expressions +import org.apache.flink.table.api.{DataTypes, LiteralStringExpression, UnresolvedFieldExpression} import org.apache.flink.table.planner.expressions.utils.ScalarOperatorsTestBase import org.junit.Test @@ -239,4 +240,16 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { "FALSE") testSqlApi("uuid() = cast(f22 as timestamp_ltz)", "NULL") } + + @Test + def testTryCast(): Unit = { + testAllApis( + "non-numeric".tryCast(DataTypes.BIGINT()), + "TRY_CAST ('non-numeric' AS BIGINT)", + "NULL") + testAllApis( + 'f10.tryCast(DataTypes.BIGINT()), + "TRY_CAST (f10 AS BIGINT)", + "NULL") + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala index db904a19f42..45976cef02b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala @@ -1858,4 +1858,12 @@ class CalcITCase extends BatchTestBase { LocalTime.of(16, 50, 1, 123000000) ))) } + + @Test + def testTryCast(): Unit = { + checkResult("SELECT TRY_CAST('invalid' AS INT)", Seq(row(null))) + checkResult( + "SELECT TRY_CAST(g AS DOUBLE) FROM testTable", + Seq(row(null), row(null), row(null))) + } }