This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new 3b790d16b73 [FLINK-27367][table-planner] Support cast int to date in legacy cast behavior 3b790d16b73 is described below commit 3b790d16b731bf6455a793207501a9bd5f0ee2a0 Author: Ron <ldliu...@163.com> AuthorDate: Tue Apr 26 16:58:44 2022 +0800 [FLINK-27367][table-planner] Support cast int to date in legacy cast behavior This closes #19574 --- .../planner/codegen/calls/ScalarOperatorGens.scala | 8 ++++++++ .../table/planner/runtime/batch/sql/CalcITCase.scala | 11 +++++++++++ .../table/planner/runtime/stream/sql/CalcITCase.scala | 19 +++++++++++++++++++ 3 files changed, 38 insertions(+) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala index 8228f647cfe..3896e2c9c26 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala @@ -935,6 +935,14 @@ object ScalarOperatorGens { // Fallback to old cast rules (operand.resultType.getTypeRoot, targetType.getTypeRoot) match { + case (INTEGER, DATE) => + if (isLegacyCastBehaviourEnabled(ctx)) { + internalExprCasting(operand, targetType) + } else { + throw new CodeGenException( + "Only legacy cast behaviour supports cast from " + + s"'${operand.resultType}' to '$targetType'.") + } // identity casting case (_, _) if isInteroperable(operand.resultType, targetType) => diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala index f6662ba96ff..7ba5779af5d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala @@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala._ import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException} import org.apache.flink.table.api.config.ExecutionConfigOptions +import org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour import org.apache.flink.table.data.{DecimalDataUtils, TimestampData} import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF} @@ -70,6 +71,16 @@ class CalcITCase extends BatchTestBase { registerCollection("testTable", buildInData, buildInType, "a,b,c,d,e,f,g,h,i,j") } + @Test + def testSelectWithLegacyCastIntToDate(): Unit = { + tEnv.getConfig.getConfiguration.set( + ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR, + LegacyCastBehaviour.ENABLED) + checkResult( + "SELECT CASE WHEN true THEN CAST(2 AS INT) ELSE CAST('2017-12-11' AS DATE) END", + Seq(row("1970-01-03"))) + } + @Test def testSelectStar(): Unit = { checkResult( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 269b40d9d59..7b4f45f0828 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -23,6 +23,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.api.config.ExecutionConfigOptions +import org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour import org.apache.flink.table.api.internal.TableEnvironmentInternal import org.apache.flink.table.api.{TableDescriptor, _} import org.apache.flink.table.data.{GenericRowData, MapData, RowData} @@ -51,6 +53,23 @@ class CalcITCase extends StreamingTestBase { @Rule def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE + @Test + def testSelectWithLegacyCastIntToDate(): Unit = { + tEnv.getConfig.getConfiguration.set( + ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR, + LegacyCastBehaviour.ENABLED) + + val result = tEnv.sqlQuery( + "SELECT CASE WHEN true THEN CAST(2 AS INT) ELSE CAST('2017-12-11' AS DATE) END") + .toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("1970-01-03") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + @Test def testCastNumericToBooleanInCondition(): Unit ={ val sqlQuery =