This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8dbc64105c13dfae467f2cfb75ec67d0d2d56c84 Author: Jark Wu <imj...@gmail.com> AuthorDate: Tue Aug 6 21:53:56 2019 +0800 [FLINK-13561][table-planner-blink] Fix FROM_UNIXTIME(bigint [,format]) should work in session time zone This aligns the behavior to other systems (MySQL, Spark). --- .../functions/sql/FlinkSqlOperatorTable.java | 4 +- .../flink/table/planner/codegen/CodeGenUtils.scala | 2 - .../planner/codegen/CodeGeneratorContext.scala | 11 -- .../planner/codegen/calls/BuiltInMethods.scala | 25 +--- .../planner/codegen/calls/FunctionGenerator.scala | 16 --- .../planner/expressions/ScalarFunctionsTest.scala | 62 +-------- .../planner/expressions/TemporalTypesTest.scala | 143 +++++++++------------ .../planner/runtime/batch/sql/CalcITCase.scala | 8 -- .../runtime/stream/sql/CorrelateITCase.scala | 2 +- 9 files changed, 69 insertions(+), 204 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 91fd8c3..f78bd1c 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -653,8 +653,8 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { VARCHAR_2000_NULLABLE, null, OperandTypes.or( - OperandTypes.family(SqlTypeFamily.NUMERIC), - OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING)), + OperandTypes.family(SqlTypeFamily.INTEGER), + OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.STRING)), SqlFunctionCategory.TIMEDATE); public static final SqlFunction IF = new SqlFunction( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala index 4ecfdbb..473b788 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala @@ -47,8 +47,6 @@ object CodeGenUtils { val DEFAULT_TIMEZONE_TERM = "timeZone" - val DEFAULT_TIMEZONE_ID_TERM = "zoneId" - val DEFAULT_INPUT1_TERM = "in1" val DEFAULT_INPUT2_TERM = "in2" diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala index 7e67a12..4244fd0 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala @@ -519,17 +519,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { } /** - * Adds a reusable Time ZoneId to the member area of the generated class. - */ - def addReusableTimeZoneID(): String = { - val zoneID = tableConfig.getLocalTimeZone.getId - val stmt = - s"""private static final java.time.ZoneId $DEFAULT_TIMEZONE_TERM = - | java.time.ZoneId.of("$zoneID");""".stripMargin - DEFAULT_TIMEZONE_ID_TERM - } - - /** * Adds a reusable [[java.util.Random]] to the member area of the generated class. * * The seed parameter must be a literal/constant expression. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala index ff4796b..c46d432 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala @@ -290,29 +290,11 @@ object BuiltInMethods { classOf[SqlDateTimeUtils], "unixTimestamp", classOf[Long]) val FROM_UNIXTIME_FORMAT = Types.lookupMethod( - classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String]) - - val FROM_UNIXTIME = Types.lookupMethod( - classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long]) - - val FROM_UNIXTIME_AS_DOUBLE = Types.lookupMethod( - classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double]) - - val FROM_UNIXTIME_AS_DECIMAL = Types.lookupMethod( - classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal]) - - val FROM_UNIXTIME_FORMAT_TIME_ZONE = Types.lookupMethod( classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone]) - val FROM_UNIXTIME_TIME_ZONE = Types.lookupMethod( + val FROM_UNIXTIME = Types.lookupMethod( classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone]) - val FROM_UNIXTIME_AS_DOUBLE_TIME_ZONE = Types.lookupMethod( - classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone]) - - val FROM_UNIXTIME_AS_DECIMAL_TIME_ZONE = Types.lookupMethod( - classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal], classOf[TimeZone]) - val DATEDIFF_T_S_TIME_ZONE = Types.lookupMethod( classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String], classOf[TimeZone]) @@ -361,11 +343,6 @@ object BuiltInMethods { val DATE_ADD_T = Types.lookupMethod( classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int]) - val INT_TO_DATE = Types.lookupMethod( - classOf[SqlDateTimeUtils], - "toDate", - classOf[Int]) - val LONG_TO_TIMESTAMP = Types.lookupMethod( classOf[SqlDateTimeUtils], "toTimestamp", diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala index 348907a..bf80027 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala @@ -792,11 +792,6 @@ object FunctionGenerator { Seq(DECIMAL), new HashCodeCallGen()) - addSqlFunctionMethod( - TO_DATE, - Seq(INTEGER), - BuiltInMethods.INT_TO_DATE) - INTEGRAL_TYPES foreach ( dt => addSqlFunctionMethod(TO_TIMESTAMP, Seq(dt), @@ -817,17 +812,6 @@ object FunctionGenerator { Seq(dt), BuiltInMethods.FROM_UNIXTIME)) - FRACTIONAL_TYPES foreach ( - dt => addSqlFunctionMethod( - FROM_UNIXTIME, - Seq(dt), - BuiltInMethods.FROM_UNIXTIME_AS_DOUBLE)) - - addSqlFunctionMethod( - FROM_UNIXTIME, - Seq(DECIMAL), - BuiltInMethods.FROM_UNIXTIME_AS_DECIMAL) - addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT) addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala index 0e5f120..ad225f3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala @@ -22,9 +22,13 @@ import org.apache.flink.table.api.scala.{currentDate, currentTime, currentTimest import org.apache.flink.table.api.{DataTypes, Types} import org.apache.flink.table.expressions.{Expression, ExpressionParser, TimeIntervalUnit, TimePointUnit} import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase - import org.junit.Test +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.time.ZoneId +import java.util.Locale + class ScalarFunctionsTest extends ScalarTypesTestBase { // ---------------------------------------------------------------------------------------------- @@ -3528,7 +3532,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { @Test def testToTimestamp(): Unit = { testSqlApi("to_timestamp('abc')", "null") - testSqlApi("to_timestamp(1513135677000)", "2017-12-13 03:27:57.000") testSqlApi("to_timestamp('2017-09-15 00:00:00')", "2017-09-15 00:00:00.000") testSqlApi("to_timestamp('20170915000000', 'yyyyMMddHHmmss')", "2017-09-15 00:00:00.000") testSqlApi("to_timestamp('2017-09-15', 'yyyy-MM-dd')", "2017-09-15 00:00:00.000") @@ -3541,30 +3544,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { testSqlApi("to_date('2017-09-15 00:00:00')", "2017-09-15") } - @Test - def testDateSub(): Unit = { - testSqlApi("date_sub(f18, 10)", "1996-10-31") - testSqlApi("date_sub(f18, -10)", "1996-11-20") - testSqlApi("date_sub(TIMESTAMP '2017-10-15 23:00:00', 30)", "2017-09-15") - testSqlApi("date_sub(f40, 30)", "null") - testSqlApi("date_sub(CAST(NULL AS TIMESTAMP), 30)", "null") - testSqlApi("date_sub(CAST(NULL AS VARCHAR), 30)", "null") - testSqlApi("date_sub('2017-10--11', 30)", "null") - testSqlApi("date_sub('2017--10-11', 30)", "null") - } - - @Test - def testDateAdd(): Unit = { - testSqlApi("date_add(f18, 10)", "1996-11-20") - testSqlApi("date_add(f18, -10)", "1996-10-31") - testSqlApi("date_add(TIMESTAMP '2017-10-15 23:00:00', 30)", "2017-11-14") - testSqlApi("date_add(f40, 30)", "null") - testSqlApi("date_add(CAST(NULL AS TIMESTAMP), 30)", "null") - testSqlApi("date_add(CAST(NULL AS VARCHAR), 30)", "null") - testSqlApi("date_add('2017-10--11', 30)", "null") - testSqlApi("date_add('2017--10-11', 30)", "null") - } - // ---------------------------------------------------------------------------------------------- // Hash functions // ---------------------------------------------------------------------------------------------- @@ -3992,37 +3971,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { } @Test - def testFromUnixTimeWithNumeric(): Unit = { - // Test integral and fractional numeric from_unixtime. - testSqlApi( - "from_unixtime(f2)", - "1970-01-01 00:00:42") - testSqlApi( - "from_unixtime(f3)", - "1970-01-01 00:00:43") - testSqlApi( - "from_unixtime(f4)", - "1970-01-01 00:00:44") - testSqlApi( - "from_unixtime(f5)", - "1970-01-01 00:00:04") - testSqlApi( - "from_unixtime(f6)", - "1970-01-01 00:00:04") - testSqlApi( - "from_unixtime(f7)", - "1970-01-01 00:00:03") - // Test decimal to from_unixtime. - testSqlApi( - "from_unixtime(f15)", - "1969-12-31 23:39:29") - // test with null input - testSqlApi( - "from_unixtime(cast(null as int))", - "null") - } - - @Test def testIsDecimal(): Unit = { testSqlApi( "IS_DECIMAL('1')", diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala index e4b98c5..4dc61a5 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala @@ -28,13 +28,12 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil import org.apache.flink.table.planner.utils.DateTimeTestUtil._ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import org.apache.flink.types.Row - import org.junit.Test import java.sql.Timestamp import java.text.SimpleDateFormat import java.time.{Instant, ZoneId} -import java.util.TimeZone +import java.util.{Locale, TimeZone} class TemporalTypesTest extends ExpressionTestBase { @@ -742,85 +741,6 @@ class TemporalTypesTest extends ExpressionTestBase { } @Test - def testUTCTimeZone(): Unit = { - config.setLocalTimeZone(ZoneId.of("UTC")) - - // Test Calcite's RexLiteral - // 1521025200000 = UTC: 2018-03-14 11:00:00 - testSqlApi("TIMESTAMP '2018-03-14 11:00:00'", "2018-03-14 11:00:00.000") - - testSqlApi("DATE '2018-03-14'", "2018-03-14") - testSqlApi("TIME '19:20:21'", "19:20:21") - - testSqlApi("TO_TIMESTAMP('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss')", - "2018-03-14 11:00:00.000") - testSqlApi("TO_TIMESTAMP('2018-03-14 11:00:00')", - "2018-03-14 11:00:00.000") - testSqlApi("TO_TIMESTAMP(1521025200000)", "2018-03-14 11:00:00.000") - - // 1521025200000 "2018-03-14T11:00:00+0000" - testSqlApi("FROM_UNIXTIME(1521025200)", - "2018-03-14 11:00:00") - testSqlApi("FROM_UNIXTIME(1521025200, 'yyyy-MM-dd HH:mm:ss')", - "2018-03-14 11:00:00") - testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP(TO_TIMESTAMP('2018-03-14 11:00:00.0')))", - "2018-03-14 11:00:00") - testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP('2018-03-14 11:00:00.0'))", - "2018-03-14 11:00:00") - - // 1520960523000 "2018-03-13T17:02:03+0000" - testSqlApi("Date_ADD(TO_TIMESTAMP(1520960523000), 2)", "2018-03-15") - testSqlApi("Date_ADD(TO_TIMESTAMP('2018-03-13 17:02:03'), 2)", "2018-03-15") - testSqlApi("Date_ADD('2018-03-13 17:02:03', 2)", "2018-03-15") - testSqlApi("Date_SUB(TO_TIMESTAMP(1520960523000), 2)", "2018-03-11") - testSqlApi("Date_SUB(TO_TIMESTAMP('2018-03-13 17:02:03'), 2)", "2018-03-11") - testSqlApi("Date_SUB('2018-03-13 17:02:03', 2)", "2018-03-11") - testSqlApi("date_add('2017--10-11', 30)", "null") - testSqlApi("date_sub('2017--10-11', 30)", "null") - - // DATE_DIFF - testSqlApi("DATEDIFF(TO_TIMESTAMP(1520960523000), '2018-03-13 17:02:03')", "0") - testSqlApi("DATEDIFF(TO_TIMESTAMP(1520827201000), TO_TIMESTAMP(1520740801000))", "1") - - // DATE_FORMAT - // 1520960523000 "2018-03-13 17:02:03+0000" - testSqlApi("DATE_FORMAT('2018-03-13 17:02:03', 'yyyy-MM-dd HH:mm:ss', " + - "'yyyy/MM/dd HH:mm:ss')", "2018/03/13 17:02:03") - testSqlApi("DATE_FORMAT(TO_TIMESTAMP(1520960523000), 'yyyy-MM-dd HH:mm:ss')", - "2018-03-13 17:02:03") - } - - @Test - def testDaylightSavingTimeZone(): Unit = { - config.setLocalTimeZone(ZoneId.of("America/New_York")) - - // TODO: add more testcases & fully support DST - // Daylight Saving - // America/New_York: -5:00, -4:00(DST) - // 2018-03-11 02:00:00 -> 2018:-3-11 03:00:00 - // 2018-11-04 02:00:00 -> 2018-11-04 01:00:00 - - // Test Calcite's RexLiteral - testSqlApi("TIMESTAMP '2018-03-14 07:00:00'", "2018-03-14 07:00:00.000") - - testSqlApi("TO_TIMESTAMP('2018-03-14 07:00:00', 'yyyy-MM-dd HH:mm:ss')", - "2018-03-14 07:00:00.000") - testSqlApi("TO_TIMESTAMP('2018-03-14 07:00:00')", - "2018-03-14 07:00:00.000") - testSqlApi("f18", "2018-03-14 07:00:00.000") - - testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP(TO_TIMESTAMP('2018-03-14 07:00:00.0')))", - "2018-03-14 07:00:00") - testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP('2018-03-14 07:00:00.0'))", - "2018-03-14 07:00:00") - - // DATE_FORMAT - testSqlApi("DATE_FORMAT('2018-03-13 13:02:03', 'yyyy-MM-dd HH:mm:ss', " + - "'yyyy/MM/dd HH:mm:ss')", "2018/03/13 13:02:03") - testSqlApi("DATE_FORMAT(f19, 'yyyy-MM-dd HH:mm:ss')", "2018-03-13 13:02:03") - } - - @Test def testHourUnitRangoonTimeZone(): Unit = { // Asia/Rangoon UTC Offset 6.5 config.setLocalTimeZone(ZoneId.of("Asia/Rangoon")) @@ -879,10 +799,63 @@ class TemporalTypesTest extends ExpressionTestBase { "2018-03-14 19:00:00") } + @Test + def testFromUnixTime(): Unit = { + val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + val fmt3 = "yy-MM-dd HH-mm-ss" + val sdf3 = new SimpleDateFormat(fmt3, Locale.US) + + testSqlApi( + "from_unixtime(f21)", + sdf1.format(new Timestamp(44000))) + testSqlApi( + s"from_unixtime(f21, '$fmt2')", + sdf2.format(new Timestamp(44000))) + testSqlApi( + s"from_unixtime(f21, '$fmt3')", + sdf3.format(new Timestamp(44000))) + + testSqlApi( + "from_unixtime(f22)", + sdf1.format(new Timestamp(3000))) + testSqlApi( + s"from_unixtime(f22, '$fmt2')", + sdf2.format(new Timestamp(3000))) + testSqlApi( + s"from_unixtime(f22, '$fmt3')", + sdf3.format(new Timestamp(3000))) + + // test with null input + testSqlApi( + "from_unixtime(cast(null as int))", + "null") + } + + @Test + def testFromUnixTimeInTokyo(): Unit = { + config.setLocalTimeZone(ZoneId.of("Asia/Tokyo")) + val fmt = "yy-MM-dd HH-mm-ss" + testSqlApi( + "from_unixtime(f21)", + "1970-01-01 09:00:44") + testSqlApi( + s"from_unixtime(f21, '$fmt')", + "70-01-01 09-00-44") + + testSqlApi( + "from_unixtime(f22)", + "1970-01-01 09:00:03") + testSqlApi( + s"from_unixtime(f22, '$fmt')", + "70-01-01 09-00-03") + } + // ---------------------------------------------------------------------------------------------- override def testData: Row = { - val testData = new Row(21) + val testData = new Row(23) testData.setField(0, localDate("1990-10-14")) testData.setField(1, DateTimeTestUtil.localTime("10:20:45")) testData.setField(2, localDateTime("1990-10-14 10:20:45.123")) @@ -908,6 +881,8 @@ class TemporalTypesTest extends ExpressionTestBase { testData.setField(18, Instant.ofEpochMilli(1521025200000L)) testData.setField(19, Instant.ofEpochMilli(1520960523000L)) testData.setField(20, Instant.ofEpochMilli(1520827201000L)) + testData.setField(21, 44L) + testData.setField(22, 3) testData } @@ -933,6 +908,8 @@ class TemporalTypesTest extends ExpressionTestBase { /* 17 */ Types.INSTANT, /* 18 */ Types.INSTANT, /* 19 */ Types.INSTANT, - /* 20 */ Types.INSTANT) + /* 20 */ Types.INSTANT, + /* 21 */ Types.LONG, + /* 22 */ Types.INT) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala index 50a33c3..17f1d6f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala @@ -1158,14 +1158,6 @@ class CalcITCase extends BatchTestBase { } @Test - def testFromUnixTime(): Unit = { - checkResult("SELECT" + - " FROM_UNIXTIME(1513193130), FROM_UNIXTIME(1513193130, 'MM/dd/yyyy HH:mm:ss')" + - " FROM testTable WHERE a = TRUE", - Seq(row("2017-12-13 19:25:30", "12/13/2017 19:25:30"))) - } - - @Test def testToDate(): Unit = { checkResult("SELECT" + " TO_DATE(CAST(null AS VARCHAR))," + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala index fa6be95..91f7922 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala @@ -214,7 +214,7 @@ class CorrelateITCase extends StreamingTestBase { val sql = """ |SELECT * FROM TMP1 - |where TIMESTAMP_ADD(day, 3, v) > DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd') + |where TIMESTAMPADD(day, 3, cast(v as date)) > DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd') """.stripMargin val sink = new TestingAppendSink