1.11 对 StreamTableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 的执行方式有所调整, 简单概述为: 1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业; 2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业; 3. 新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业 (异步提交作业),不需要再调用 StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
详细可以参考 [1] [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset Best, Godfrey Zhou Zach <wander...@163.com> 于2020年7月8日周三 下午4:19写道: > 去掉就好了,感谢解答 > > > > > > > > > > > > > > > > > > 在 2020-07-08 16:07:17,"Jingsong Li" <jingsongl...@gmail.com> 写道: > >Hi, > > > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > > > >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" > >并没有真正的物理节点。你不用再调用了。 > > > >Best, > >Jingsong > > > >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach <wander...@163.com> wrote: > > > >> > >> > >> > >> 代码结构改成这样的了: > >> > >> > >> > >> > >> val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> > >> val blinkEnvSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> > >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, > >> blinkEnvSettings) > >> > >> > >> > >> > >> > >> streamExecutionEnv.execute("from kafka sink hbase") > >> > >> > >> > >> > >> 还是报一样的错 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-08 15:40:41,"夏帅" <jkill...@dingtalk.com.INVALID> 写道: > >> >你好, > >> >可以看看你的代码结构是不是以下这种 > >> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > >> > val bsSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build > >> > val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > >> > ...... > >> > tableEnv.execute("") > >> >如果是的话,可以尝试使用bsEnv.execute("") > >> >1.11对于两者的execute代码实现有改动 > >> > > >> > > >> >------------------------------------------------------------------ > >> >发件人:Zhou Zach <wander...@163.com> > >> >发送时间:2020年7月8日(星期三) 15:30 > >> >收件人:Flink user-zh mailing list <user-zh@flink.apache.org> > >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming > topology > >> > > >> >代码在flink > >> > 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: > >> >Exception in thread "main" java.lang.IllegalStateException: No > operators > >> defined in streaming topology. Cannot generate StreamGraph. > >> >at > >> > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > >> >at > >> > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > >> >at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) > >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) > >> > > >> > > >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 > >> > > >> > > >> > > >> > > >> >query: > >> >streamTableEnv.executeSql( > >> > """ > >> > | > >> > |CREATE TABLE `user` ( > >> > | uid BIGINT, > >> > | sex VARCHAR, > >> > | age INT, > >> > | created_time TIMESTAMP(3), > >> > | WATERMARK FOR created_time as created_time - INTERVAL '3' > >> SECOND > >> > |) WITH ( > >> > | 'connector.type' = 'kafka', > >> > | 'connector.version' = 'universal', > >> > | -- 'connector.topic' = 'user', > >> > | 'connector.topic' = 'user_long', > >> > | 'connector.startup-mode' = 'latest-offset', > >> > | 'connector.properties.group.id' = 'user_flink', > >> > | 'format.type' = 'json', > >> > | 'format.derive-schema' = 'true' > >> > |) > >> > |""".stripMargin) > >> > > >> > > >> > > >> > > >> > > >> > > >> > streamTableEnv.executeSql( > >> > """ > >> > | > >> > |CREATE TABLE user_hbase3( > >> > | rowkey BIGINT, > >> > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) > >> > |) WITH ( > >> > | 'connector.type' = 'hbase', > >> > | 'connector.version' = '2.1.0', > >> > | 'connector.table-name' = 'user_hbase2', > >> > | 'connector.zookeeper.znode.parent' = '/hbase', > >> > | 'connector.write.buffer-flush.max-size' = '10mb', > >> > | 'connector.write.buffer-flush.max-rows' = '1000', > >> > | 'connector.write.buffer-flush.interval' = '2s' > >> > |) > >> > |""".stripMargin) > >> > > >> > > >> > streamTableEnv.executeSql( > >> > """ > >> > | > >> > |insert into user_hbase3 > >> > |SELECT uid, > >> > | > >> > | ROW(sex, age, created_time ) as cf > >> > | FROM (select uid,sex,age, cast(created_time as VARCHAR) as > >> created_time from `user`) > >> > | > >> > |""".stripMargin) > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > > > > >-- > >Best, Jingsong Lee >