This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ab51671  [FLINK-21623][table-planner] Introduce 
CURRENT_ROW_TIMESTAMP() function
ab51671 is described below

commit ab51671617976fd56bd6bdb63620378dab0ae4c1
Author: Leonard Xu <xbjt...@gmail.com>
AuthorDate: Mon Mar 22 16:50:52 2021 +0800

    [FLINK-21623][table-planner] Introduce CURRENT_ROW_TIMESTAMP() function
    
    This closes #15306
---
 docs/data/sql_functions.yml                        |  2 ++
 flink-python/pyflink/table/expressions.py          |  4 +--
 .../functions/BuiltInFunctionDefinitions.java      |  7 +++++
 .../expressions/converter/DirectConvertRule.java   |  3 ++
 .../functions/sql/FlinkSqlOperatorTable.java       |  9 ++++++
 .../planner/codegen/calls/FunctionGenerator.scala  |  6 ++++
 .../expressions/NonDeterministicTests.scala        | 34 ++++++++++++++++++++--
 7 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index f963742..30b6eb6 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -432,6 +432,8 @@ temporal:
     description: Returns the current SQL timestamp in the local time zone, the 
return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record 
in streaming mode. But in batch mode, it is evaluated once as the query starts 
and uses the same result for every row.
   - sql: NOW()
     description: Returns the current SQL timestamp in the local time zone, 
this is a synonym of CURRENT_TIMESTAMP.
+  - sql: CURRENT_ROW_TIMESTAMP()
+    description: Returns the current SQL timestamp in the local time zone, the 
return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record 
no matter in batch or streaming mode.
   - sql: EXTRACT(timeinteravlunit FROM temporal)
     table: TEMPORAL.extract(TIMEINTERVALUNIT)
     description: Returns a long value extracted from the timeintervalunit part 
of temporal. E.g., EXTRACT(DAY FROM DATE '2006-06-05') returns 5.
diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index c79cbd0..179fac4 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -198,7 +198,7 @@ def current_time() -> Expression:
 def current_timestamp() -> Expression:
     """
     Returns the current SQL timestamp in local time zone,
-    the underlying function return type is TIMESTAMP_LTZ.
+    the return type of this expression is TIMESTAMP_LTZ.
     """
     return _leaf_op("currentTimestamp")
 
@@ -213,7 +213,7 @@ def local_time() -> Expression:
 def local_timestamp() -> Expression:
     """
     Returns the current SQL timestamp in local time zone,
-    the underlying function return type is TIMESTAMP.
+    the return type of this expression s TIMESTAMP.
     """
     return _leaf_op("localTimestamp")
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e93dc78..5fb74da 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1121,6 +1121,13 @@ public final class BuiltInFunctionDefinitions {
                     .outputTypeStrategy(TypeStrategies.MISSING)
                     .build();
 
+    public static final BuiltInFunctionDefinition CURRENT_ROW_TIMESTAMP =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("currentRowTimestamp")
+                    .kind(SCALAR)
+                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .build();
+
     public static final BuiltInFunctionDefinition LOCAL_TIME =
             BuiltInFunctionDefinition.newBuilder()
                     .name("localTime")
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 791cb78..11ae9f6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -172,6 +172,9 @@ public class DirectConvertRule implements 
CallExpressionConvertRule {
                 BuiltInFunctionDefinitions.CURRENT_TIMESTAMP,
                 FlinkSqlOperatorTable.CURRENT_TIMESTAMP);
         DEFINITION_OPERATOR_MAP.put(
+                BuiltInFunctionDefinitions.CURRENT_ROW_TIMESTAMP,
+                FlinkSqlOperatorTable.CURRENT_ROW_TIMESTAMP);
+        DEFINITION_OPERATOR_MAP.put(
                 BuiltInFunctionDefinitions.LOCAL_TIME, 
FlinkSqlOperatorTable.LOCALTIME);
         DEFINITION_OPERATOR_MAP.put(
                 BuiltInFunctionDefinitions.LOCAL_TIMESTAMP, 
FlinkSqlOperatorTable.LOCALTIMESTAMP);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 01cc25c..86f577b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -566,6 +566,15 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
     public static final SqlFunction CURRENT_TIMESTAMP =
             new SqlCurrentTimestampFunction("CURRENT_TIMESTAMP");
 
+    public static final SqlFunction CURRENT_ROW_TIMESTAMP =
+            new SqlCurrentTimestampFunction("CURRENT_ROW_TIMESTAMP") {
+
+                @Override
+                public SqlSyntax getSyntax() {
+                    return SqlSyntax.FUNCTION;
+                }
+            };
+
     public static final SqlFunction UNIX_TIMESTAMP =
             new SqlFunction(
                     "UNIX_TIMESTAMP",
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index db43723..18c91e3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -506,6 +506,12 @@ class FunctionGenerator private(config: TableConfig) {
     Seq(),
     new CurrentTimePointCallGen(false, isStreamingMode))
 
+  // CURRENT_ROW_TIMESTAMP evaluates in row-level
+  addSqlFunction(
+    CURRENT_ROW_TIMESTAMP,
+    Seq(),
+    new CurrentTimePointCallGen(false, true))
+
   addSqlFunction(
     LOCALTIME,
     Seq(),
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
index 33d0367..a88135b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import org.apache.flink.table.planner.utils.InternalConfigOptions
 import org.apache.flink.types.Row
+
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
@@ -46,6 +47,7 @@ class NonDeterministicTests extends ExpressionTestBase {
       "CURRENT_DATE",
       "CURRENT_TIME",
       "CURRENT_TIMESTAMP",
+      "CURRENT_ROW_TIMESTAMP()",
       "NOW()",
       "LOCALTIME",
       "LOCALTIMESTAMP"))
@@ -55,15 +57,22 @@ class NonDeterministicTests extends ExpressionTestBase {
 
     assertEquals(round1.size, round2.size)
     round1.zip(round2).zipWithIndex.foreach {
-      case ((result1: String, result2: String), index: Int) => {
+      case ((result1: String, result2: String), index: Int) =>
         // CURRENT_DATE may be same between two records
         if (index == 0) {
           assert(result1 <= result2)
         } else {
           assert(result1 < result2)
         }
-      }
     }
+
+    // check CURRENT_TIMESTAMP function and CURRENT_ROW_TIMESTAMP() function
+    // should return same value for one record in stream job
+    val currentTimeStampIndex = 2
+    val currentRowTimestampIndex = 3
+    assertEquals(round1(currentTimeStampIndex), 
round1(currentRowTimestampIndex))
+    assertEquals(round2(currentTimeStampIndex), 
round2(currentRowTimestampIndex))
+
   }
 
   @Test
@@ -98,6 +107,22 @@ class NonDeterministicTests extends ExpressionTestBase {
   }
 
   @Test
+  def testCurrentRowTimestampFunctionsInBatchMode(): Unit = {
+    config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH)
+    val temporalFunctions = 
getCodeGenFunctions(List("CURRENT_ROW_TIMESTAMP()"))
+
+    val round1 = evaluateFunctionResult(temporalFunctions)
+    Thread.sleep(1 * 1000L)
+    val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+    assertEquals(round1.size, round2.size)
+    round1.zip(round2).foreach {
+      case (result1: String, result2: String) =>
+        assert(result1 < result2)
+    }
+  }
+
+  @Test
   def testTemporalFunctionsInUTC(): Unit = {
     testTemporalTimestamp(ZoneId.of("UTC"))
   }
@@ -148,6 +173,11 @@ class NonDeterministicTests extends ExpressionTestBase {
     testSqlApi(
       s"TIMESTAMPDIFF(SECOND, ${timestampLtz(formattedCurrentTimestamp)}, 
NOW()) <= 60",
       "true")
+
+    testSqlApi(
+      s"TIMESTAMPDIFF(SECOND, " +
+        s"${timestampLtz(formattedCurrentTimestamp)}, CURRENT_ROW_TIMESTAMP()) 
<= 60",
+      "true")
   }
 
   @Test

Reply via email to