Weighing ~ tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to the backend cluster with a builtin job name, the tEnv.executeSql itself did return a JobResult immediately with a constant affected rows count -1.
Best, Danny Chan 在 2020年8月13日 +0800 PM3:46,Lu Weizheng <luweizhen...@hotmail.com>,写道: > Thanks Timo, > > So no need to use execute() method in Flink SQL If I do all the thins from > source to sink in SQL. > > Best Regards, > Lu > > > 2020年8月13日 下午3:41,Timo Walther <twal...@apache.org> 写道: > > > > Hi Lu, > > > > `env.execute("table api");` is not necessary after FLIP-84 [1]. Every > > method that has `execute` in its name will immediately execute a job. > > Therefore your `env.execute` has an empty pipeline. > > > > Regards, > > Timo > > > > [1] > > https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 > > > > On 13.08.20 09:34, Lu Weizheng wrote: > > > Hi, > > > I am using Flink 1.11 SQL using java. All my operations are in SQL. I > > > create source tables and insert result into sink tables. No other Java > > > operators. I execute it in Intellij. I can get the final result in the > > > sink tables. However I get the following error. I am not sure it is a bug > > > or there is something wrong in my code? Acutally it does not affect the > > > computation. > > > /Exception in thread "main" java.lang.IllegalStateException: No operators > > > defined in streaming topology. Cannot execute./ > > > /at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/ > > > /at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/ > > > /at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/ > > > /at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/ > > > /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/ > > > Here's my code: > > > EnvironmentSettings fsSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > > StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > > > fsSettings); > > > // create source and sink tables... > > > tEnv.executeSql("INSERT INTO sensor_1min_avg " + > > > "SELECT " + > > > " room, " + > > > " AVG(temp) AS avg_temp," + > > > " TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " + > > > "FROM sensor " + > > > "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)"); > > > env.execute("table api"); > > >