Airblader commented on a change in pull request #16046:
URL: https://github.com/apache/flink/pull/16046#discussion_r649081779



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
##########
@@ -543,6 +543,32 @@ object GenerateUtils {
       resultType)
   }
 
+  def generateWatermark(
+      ctx: CodeGeneratorContext,
+      contextTerm: String,
+      isTimestampLtz: Boolean): GeneratedExpression = {
+    val resultType = if (isTimestampLtz) {
+      new LocalZonedTimestampType(3)
+    } else {
+      new TimestampType(3)
+    }
+
+    val resultTypeTerm = primitiveTypeTermForType(resultType)
+    val Seq(resultTerm, nullTerm, currentWatermarkTerm) = 
ctx.addReusableLocalVariables(
+      (resultTypeTerm, "result"),
+      ("boolean", "isNull"),
+      ("long", "currentWatermark")
+    )
+
+    val code =
+      s"""
+         |$currentWatermarkTerm = 
$contextTerm.timerService().currentWatermark();

Review comment:
       > what happens when used in DDL
   
   This throws a validation exception because at that point in time the time 
column is not yet set to be a rowtime column, so we never reach the generated 
code in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to