你应该用的是最新的版本,flink1.10 之后已经改了操作方式,
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
tEnv.executeSql(“”)
就OK
在2020年09月11日 17:58,me<m...@lishiyu.cn> 写道:
flink sql执行sql语句
SELECT kafka_table.src_ip AS kafka_table_src_ip,COUNT(kafka_table.dest_ip) AS
COUNT_kafka_table_dest_ip_ FROM kafka_table GROUP BY kafka_table.src_ip
直接我发运行,我的初始化环境是
初始化 dataStreamEnv
初始化 tableEnv
1.执行sql
2.执行sql的结果转为datastream
dataStreamEnv.execute("SqlPlatformRealTime")
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot execute.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)