Andrew Chan created FLINK-28572: ----------------------------------- Summary: FlinkSQL executes Table.execute multiple times on the same Table, and changing the Table.execute code position will produce different phenomena Key: FLINK-28572 URL: https://issues.apache.org/jira/browse/FLINK-28572 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.13.6 Environment: flink-table-planner-blink_2.11
1.13.6 Reporter: Andrew Chan *The following code prints and inserts fine* public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("create table sensor(" + " id string, " + " ts bigint, " + " vc int" + ")with(" + " 'connector' = 'kafka', " + " 'topic' = 's1', " + " 'properties.bootstrap.servers' = 'hadoop162:9092', " + " 'properties.group.id' = 'atguigu', " + " 'scan.startup.mode' = 'latest-offset', " + " 'format' = 'csv'" + ")"); Table result = tEnv.sqlQuery("select * from sensor"); tEnv.executeSql("create table s_out(" + " id string, " + " ts bigint, " + " vc int" + ")with(" + " 'connector' = 'kafka', " + " 'topic' = 's2', " + " 'properties.bootstrap.servers' = 'hadoop162:9092', " + " 'format' = 'json', " + " 'sink.partitioner' = 'round-robin' " + ")"); result.executeInsert("s_out"); result.execute().print(); } *When the code that prints this line is moved up, it can be printed normally, but the insert statement is invalid, as follows* public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 1. 通过ddl方式建表(动态表), 与文件关联 tEnv.executeSql("create table sensor(" + " id string, " + " ts bigint, " + " vc int" + ")with(" + " 'connector' = 'kafka', " + " 'topic' = 's1', " + " 'properties.bootstrap.servers' = 'hadoop162:9092', " + " 'properties.group.id' = 'atguigu', " + " 'scan.startup.mode' = 'latest-offset', " + " 'format' = 'csv'" + ")"); Table result = tEnv.sqlQuery("select * from sensor"); tEnv.executeSql("create table s_out(" + " id string, " + " ts bigint, " + " vc int" + ")with(" + " 'connector' = 'kafka', " + " 'topic' = 's2', " + " 'properties.bootstrap.servers' = 'hadoop162:9092', " + " 'format' = 'json', " + " 'sink.partitioner' = 'round-robin' " + ")"); {color:#FF0000}result.execute().print();{color} {color:#FF0000} result.executeInsert("s_out");{color} } -- This message was sent by Atlassian Jira (v8.20.10#820010)