This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ab51671 [FLINK-21623][table-planner] Introduce CURRENT_ROW_TIMESTAMP() function ab51671 is described below commit ab51671617976fd56bd6bdb63620378dab0ae4c1 Author: Leonard Xu <xbjt...@gmail.com> AuthorDate: Mon Mar 22 16:50:52 2021 +0800 [FLINK-21623][table-planner] Introduce CURRENT_ROW_TIMESTAMP() function This closes #15306 --- docs/data/sql_functions.yml | 2 ++ flink-python/pyflink/table/expressions.py | 4 +-- .../functions/BuiltInFunctionDefinitions.java | 7 +++++ .../expressions/converter/DirectConvertRule.java | 3 ++ .../functions/sql/FlinkSqlOperatorTable.java | 9 ++++++ .../planner/codegen/calls/FunctionGenerator.scala | 6 ++++ .../expressions/NonDeterministicTests.scala | 34 ++++++++++++++++++++-- 7 files changed, 61 insertions(+), 4 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index f963742..30b6eb6 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -432,6 +432,8 @@ temporal: description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row. - sql: NOW() description: Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. + - sql: CURRENT_ROW_TIMESTAMP() + description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record no matter in batch or streaming mode. - sql: EXTRACT(timeinteravlunit FROM temporal) table: TEMPORAL.extract(TIMEINTERVALUNIT) description: Returns a long value extracted from the timeintervalunit part of temporal. E.g., EXTRACT(DAY FROM DATE '2006-06-05') returns 5. diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py index c79cbd0..179fac4 100644 --- a/flink-python/pyflink/table/expressions.py +++ b/flink-python/pyflink/table/expressions.py @@ -198,7 +198,7 @@ def current_time() -> Expression: def current_timestamp() -> Expression: """ Returns the current SQL timestamp in local time zone, - the underlying function return type is TIMESTAMP_LTZ. + the return type of this expression is TIMESTAMP_LTZ. """ return _leaf_op("currentTimestamp") @@ -213,7 +213,7 @@ def local_time() -> Expression: def local_timestamp() -> Expression: """ Returns the current SQL timestamp in local time zone, - the underlying function return type is TIMESTAMP. + the return type of this expression s TIMESTAMP. """ return _leaf_op("localTimestamp") diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index e93dc78..5fb74da 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1121,6 +1121,13 @@ public final class BuiltInFunctionDefinitions { .outputTypeStrategy(TypeStrategies.MISSING) .build(); + public static final BuiltInFunctionDefinition CURRENT_ROW_TIMESTAMP = + BuiltInFunctionDefinition.newBuilder() + .name("currentRowTimestamp") + .kind(SCALAR) + .outputTypeStrategy(TypeStrategies.MISSING) + .build(); + public static final BuiltInFunctionDefinition LOCAL_TIME = BuiltInFunctionDefinition.newBuilder() .name("localTime") diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java index 791cb78..11ae9f6 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java @@ -172,6 +172,9 @@ public class DirectConvertRule implements CallExpressionConvertRule { BuiltInFunctionDefinitions.CURRENT_TIMESTAMP, FlinkSqlOperatorTable.CURRENT_TIMESTAMP); DEFINITION_OPERATOR_MAP.put( + BuiltInFunctionDefinitions.CURRENT_ROW_TIMESTAMP, + FlinkSqlOperatorTable.CURRENT_ROW_TIMESTAMP); + DEFINITION_OPERATOR_MAP.put( BuiltInFunctionDefinitions.LOCAL_TIME, FlinkSqlOperatorTable.LOCALTIME); DEFINITION_OPERATOR_MAP.put( BuiltInFunctionDefinitions.LOCAL_TIMESTAMP, FlinkSqlOperatorTable.LOCALTIMESTAMP); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 01cc25c..86f577b 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -566,6 +566,15 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction CURRENT_TIMESTAMP = new SqlCurrentTimestampFunction("CURRENT_TIMESTAMP"); + public static final SqlFunction CURRENT_ROW_TIMESTAMP = + new SqlCurrentTimestampFunction("CURRENT_ROW_TIMESTAMP") { + + @Override + public SqlSyntax getSyntax() { + return SqlSyntax.FUNCTION; + } + }; + public static final SqlFunction UNIX_TIMESTAMP = new SqlFunction( "UNIX_TIMESTAMP", diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala index db43723..18c91e3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala @@ -506,6 +506,12 @@ class FunctionGenerator private(config: TableConfig) { Seq(), new CurrentTimePointCallGen(false, isStreamingMode)) + // CURRENT_ROW_TIMESTAMP evaluates in row-level + addSqlFunction( + CURRENT_ROW_TIMESTAMP, + Seq(), + new CurrentTimePointCallGen(false, true)) + addSqlFunction( LOCALTIME, Seq(), diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala index 33d0367..a88135b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala @@ -30,6 +30,7 @@ import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase import org.apache.flink.table.planner.utils.InternalConfigOptions import org.apache.flink.types.Row + import org.junit.Assert.assertEquals import org.junit.Test @@ -46,6 +47,7 @@ class NonDeterministicTests extends ExpressionTestBase { "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", + "CURRENT_ROW_TIMESTAMP()", "NOW()", "LOCALTIME", "LOCALTIMESTAMP")) @@ -55,15 +57,22 @@ class NonDeterministicTests extends ExpressionTestBase { assertEquals(round1.size, round2.size) round1.zip(round2).zipWithIndex.foreach { - case ((result1: String, result2: String), index: Int) => { + case ((result1: String, result2: String), index: Int) => // CURRENT_DATE may be same between two records if (index == 0) { assert(result1 <= result2) } else { assert(result1 < result2) } - } } + + // check CURRENT_TIMESTAMP function and CURRENT_ROW_TIMESTAMP() function + // should return same value for one record in stream job + val currentTimeStampIndex = 2 + val currentRowTimestampIndex = 3 + assertEquals(round1(currentTimeStampIndex), round1(currentRowTimestampIndex)) + assertEquals(round2(currentTimeStampIndex), round2(currentRowTimestampIndex)) + } @Test @@ -98,6 +107,22 @@ class NonDeterministicTests extends ExpressionTestBase { } @Test + def testCurrentRowTimestampFunctionsInBatchMode(): Unit = { + config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH) + val temporalFunctions = getCodeGenFunctions(List("CURRENT_ROW_TIMESTAMP()")) + + val round1 = evaluateFunctionResult(temporalFunctions) + Thread.sleep(1 * 1000L) + val round2: List[String] = evaluateFunctionResult(temporalFunctions) + + assertEquals(round1.size, round2.size) + round1.zip(round2).foreach { + case (result1: String, result2: String) => + assert(result1 < result2) + } + } + + @Test def testTemporalFunctionsInUTC(): Unit = { testTemporalTimestamp(ZoneId.of("UTC")) } @@ -148,6 +173,11 @@ class NonDeterministicTests extends ExpressionTestBase { testSqlApi( s"TIMESTAMPDIFF(SECOND, ${timestampLtz(formattedCurrentTimestamp)}, NOW()) <= 60", "true") + + testSqlApi( + s"TIMESTAMPDIFF(SECOND, " + + s"${timestampLtz(formattedCurrentTimestamp)}, CURRENT_ROW_TIMESTAMP()) <= 60", + "true") } @Test