Hongbo created FLINK-28693: ------------------------------ Summary: Codegen failed if the watermark is defined on a columnByExpression Key: FLINK-28693 URL: https://issues.apache.org/jira/browse/FLINK-28693 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.15.1 Reporter: Hongbo
The following code will throw an exception: ``` {color:#000000} {color}{color:#008080}Table{color}{color:#000000} program cannot be compiled. {color}{color:#0000ff}This{color}{color:#000000} is a bug. {color}{color:#008080}Please{color}{color:#000000} file an issue.{color} {color:#000000} ...{color} {color:#008080} Caused{color}{color:#000000} {color}{color:#0000ff}by{color}{color:#000000}: org.codehaus.commons.compiler.{color}{color:#008080}CompileException{color}{color:#000000}: {color}{color:#008080}Line{color}{color:#000000} {color}{color:#098658}29{color}{color:#000000}, {color}{color:#008080}Column{color}{color:#000000} {color}{color:#098658}54{color}{color:#000000}: {color}{color:#008080}Cannot{color}{color:#000000} determine simple {color}{color:#0000ff}type{color}{color:#000000} name {color}{color:#a31515}"org"{color} {color:#000000}```{color} {color:#000000}Code:{color} {color:#000000}```{color} {color:#cc7832}public class {color}TestUdf {color:#cc7832}extends {color}ScalarFunction { {color:#bbb529}@DataTypeHint{color}({color:#6a8759}"TIMESTAMP(3)"{color}) {color:#cc7832}public {color}LocalDateTime {color:#ffc66d}eval{color}(String strDate) { {color:#cc7832}return {color}LocalDateTime.now(){color:#cc7832}; {color}{color:#cc7832} {color}} } {color:#cc7832}public{color}{color:#cc7832} class {color}FlinkTest { {color:#bbb529}@Test {color}{color:#bbb529} {color}{color:#cc7832}void {color}{color:#ffc66d}testUdf{color}() {color:#cc7832}throws {color}Exception { {color:#808080}//var env = StreamExecutionEnvironment.createLocalEnvironment(); {color}{color:#808080} // run `gradlew shadowJar` first to generate the uber jar. {color}{color:#808080} // It contains the kafka connector and a dummy UDF function. {color}{color:#808080} {color}{color:#808080} {color}{color:#cc7832}var {color}env = StreamExecutionEnvironment.createRemoteEnvironment({color:#6a8759}"localhost"{color}{color:#cc7832}, {color}{color:#6897bb}8081{color}{color:#cc7832}, {color}{color:#cc7832} {color}{color:#6a8759}"build/libs/flink-test-all.jar"{color}){color:#cc7832}; {color}{color:#cc7832} {color}env.setParallelism({color:#6897bb}1{color}){color:#cc7832}; {color}{color:#cc7832} var {color}tableEnv = StreamTableEnvironment.create(env){color:#cc7832}; {color}{color:#cc7832} {color}tableEnv.createTemporarySystemFunction({color:#6a8759}"TEST_UDF"{color}{color:#cc7832}, {color}TestUdf.{color:#cc7832}class{color}){color:#cc7832}; {color}{color:#cc7832} {color}{color:#cc7832} var {color}testTable = tableEnv.from(TableDescriptor.forConnector({color:#6a8759}"kafka"{color}) .schema(Schema.newBuilder() .column({color:#6a8759}"time_stamp"{color}{color:#cc7832}, {color}DataTypes.STRING()) .columnByExpression({color:#6a8759}"udf_ts"{color}{color:#cc7832}, {color}{color:#6a8759}"TEST_UDF(time_stamp)"{color}) .watermark({color:#6a8759}"udf_ts"{color}{color:#cc7832}, {color}{color:#6a8759}"udf_ts - INTERVAL '1' second"{color}) .build()) {color:#808080}// the kafka server doesn't need to exist. It fails in the compile stage before fetching data. {color}{color:#808080} {color}.option({color:#6a8759}"properties.bootstrap.servers"{color}{color:#cc7832}, {color}{color:#6a8759}"localhost:9092"{color}) .option({color:#6a8759}"topic"{color}{color:#cc7832}, {color}{color:#6a8759}"test_topic"{color}) .option({color:#6a8759}"format"{color}{color:#cc7832}, {color}{color:#6a8759}"json"{color}) .option({color:#6a8759}"scan.startup.mode"{color}{color:#cc7832}, {color}{color:#6a8759}"latest-offset"{color}) .build()){color:#cc7832}; {color}{color:#cc7832} {color}testTable.printSchema(){color:#cc7832}; {color}{color:#cc7832} {color}tableEnv.createTemporaryView({color:#6a8759}"test"{color}{color:#cc7832}, {color}testTable ){color:#cc7832}; {color}{color:#cc7832} {color}{color:#cc7832} var {color}query = tableEnv.sqlQuery({color:#6a8759}"select * from test"{color}){color:#cc7832}; {color}{color:#cc7832} var {color}tableResult = query.executeInsert(TableDescriptor.forConnector({color:#6a8759}"print"{color}).build()){color:#cc7832}; {color}{color:#cc7832} {color}tableResult.await(){color:#cc7832}; {color}{color:#cc7832} {color}} } {color:#000000}```{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)