This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 3b790d16b73 [FLINK-27367][table-planner] Support cast int to date in 
legacy cast behavior
3b790d16b73 is described below

commit 3b790d16b731bf6455a793207501a9bd5f0ee2a0
Author: Ron <ldliu...@163.com>
AuthorDate: Tue Apr 26 16:58:44 2022 +0800

    [FLINK-27367][table-planner] Support cast int to date in legacy cast 
behavior
    
    This closes #19574
---
 .../planner/codegen/calls/ScalarOperatorGens.scala    |  8 ++++++++
 .../table/planner/runtime/batch/sql/CalcITCase.scala  | 11 +++++++++++
 .../table/planner/runtime/stream/sql/CalcITCase.scala | 19 +++++++++++++++++++
 3 files changed, 38 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 8228f647cfe..3896e2c9c26 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -935,6 +935,14 @@ object ScalarOperatorGens {
 
     // Fallback to old cast rules
     (operand.resultType.getTypeRoot, targetType.getTypeRoot) match {
+      case (INTEGER, DATE) =>
+        if (isLegacyCastBehaviourEnabled(ctx)) {
+          internalExprCasting(operand, targetType)
+        } else {
+          throw new CodeGenException(
+            "Only legacy cast behaviour supports cast from "
+              + s"'${operand.resultType}' to '$targetType'.")
+        }
 
       // identity casting
       case (_, _) if isInteroperable(operand.resultType, targetType) =>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index f6662ba96ff..7ba5779af5d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException}
 import org.apache.flink.table.api.config.ExecutionConfigOptions
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour
 import org.apache.flink.table.data.{DecimalDataUtils, TimestampData}
 import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter
 import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, 
RichFunc3, SplitUDF}
@@ -70,6 +71,16 @@ class CalcITCase extends BatchTestBase {
     registerCollection("testTable", buildInData, buildInType, 
"a,b,c,d,e,f,g,h,i,j")
   }
 
+  @Test
+  def testSelectWithLegacyCastIntToDate(): Unit = {
+    tEnv.getConfig.getConfiguration.set(
+      ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
+      LegacyCastBehaviour.ENABLED)
+    checkResult(
+      "SELECT CASE WHEN true THEN CAST(2 AS INT) ELSE CAST('2017-12-11' AS 
DATE) END",
+      Seq(row("1970-01-03")))
+  }
+
   @Test
   def testSelectStar(): Unit = {
     checkResult(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 269b40d9d59..7b4f45f0828 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -23,6 +23,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.Types
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.api.{TableDescriptor, _}
 import org.apache.flink.table.data.{GenericRowData, MapData, RowData}
@@ -51,6 +53,23 @@ class CalcITCase extends StreamingTestBase {
   @Rule
   def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
 
+  @Test
+  def testSelectWithLegacyCastIntToDate(): Unit = {
+    tEnv.getConfig.getConfiguration.set(
+      ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
+      LegacyCastBehaviour.ENABLED)
+
+    val result = tEnv.sqlQuery(
+      "SELECT CASE WHEN true THEN CAST(2 AS INT) ELSE CAST('2017-12-11' AS 
DATE) END")
+      .toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1970-01-03")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
   @Test
   def testCastNumericToBooleanInCondition(): Unit ={
     val sqlQuery =

Reply via email to