Andrew Chan created FLINK-28572:
-----------------------------------

             Summary: FlinkSQL executes Table.execute multiple times on the 
same Table, and changing the Table.execute code position will produce different 
phenomena
                 Key: FLINK-28572
                 URL: https://issues.apache.org/jira/browse/FLINK-28572
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API, Table SQL / Planner
    Affects Versions: 1.13.6
         Environment: flink-table-planner-blink_2.11  

1.13.6
            Reporter: Andrew Chan


*The following code prints and inserts fine*

public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table sensor(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's1', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'properties.group.id' = 'atguigu', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'csv'" +
")");
Table result = tEnv.sqlQuery("select * from sensor");
tEnv.executeSql("create table s_out(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's2', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'format' = 'json', " +
" 'sink.partitioner' = 'round-robin' " +
")");
result.executeInsert("s_out");
result.execute().print();
}

 

*When the code that prints this line is moved up, it can be printed normally, 
but the insert statement is invalid, as follows*

public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 1. 通过ddl方式建表(动态表), 与文件关联
tEnv.executeSql("create table sensor(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's1', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'properties.group.id' = 'atguigu', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'csv'" +
")");


Table result = tEnv.sqlQuery("select * from sensor");

tEnv.executeSql("create table s_out(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's2', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'format' = 'json', " +
" 'sink.partitioner' = 'round-robin' " +
")");
{color:#FF0000}result.execute().print();{color}
{color:#FF0000} result.executeInsert("s_out");{color}
}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to