Airblader commented on a change in pull request #16046: URL: https://github.com/apache/flink/pull/16046#discussion_r649081779
########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala ########## @@ -543,6 +543,32 @@ object GenerateUtils { resultType) } + def generateWatermark( + ctx: CodeGeneratorContext, + contextTerm: String, + isTimestampLtz: Boolean): GeneratedExpression = { + val resultType = if (isTimestampLtz) { + new LocalZonedTimestampType(3) + } else { + new TimestampType(3) + } + + val resultTypeTerm = primitiveTypeTermForType(resultType) + val Seq(resultTerm, nullTerm, currentWatermarkTerm) = ctx.addReusableLocalVariables( + (resultTypeTerm, "result"), + ("boolean", "isNull"), + ("long", "currentWatermark") + ) + + val code = + s""" + |$currentWatermarkTerm = $contextTerm.timerService().currentWatermark(); Review comment: > what happens when used in DDL This throws a validation exception because at that point in time the time column is not yet set to be a rowtime column, so we never reach the generated code in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org