[jira] [Comment Edited] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835457#comment-17835457 ] Sergey Nuyanzin edited comment on FLINK-28693 at 4/9/24 6:02 PM: - 1.18: [f5c62abf7475ea8bc976de2a2079b1a9e29b79df|https://github.com/apache/flink/commit/f5c62abf7475ea8bc976de2a2079b1a9e29b79df] 1.19: [b1165a89edb9857754e283c6afd7983a34acd465|https://github.com/apache/flink/commit/b1165a89edb9857754e283c6afd7983a34acd465] was (Author: sergey nuyanzin): 1.18: [f5c62abf7475ea8bc976de2a2079b1a9e29b79df|https://github.com/apache/flink/commit/f5c62abf7475ea8bc976de2a2079b1a9e29b79df] > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at >
[jira] [Comment Edited] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571487#comment-17571487 ] Hongbo edited comment on FLINK-28693 at 7/26/22 5:49 PM: - Swaping flink-table-planner-loader with flink-table-planer in the opt/ folder can solve the problem. Is there any drawback to the swap? (If not, why use flink-table-planner-loader as the default, as I saw others also reported problems about this lib). was (Author: liuhb86): Swapping replacing flink-table-planner-loader with flink-table-planer in the opt/ folder can solve the problem. Is there any drawback to the swap? (If not, why use flink-table-planner-loader as the default, as I saw others also reported problems about this lib). > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at >