Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
hi, 目前没有解决办法,insert job根据sink表名自动生成job name。 后续解法关注 https://issues.apache.org/jira/browse/FLINK-18545 Weixubin <18925434...@163.com> 于2020年7月23日周四 下午6:07写道: > Hi, > 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。 > 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。 > 请问有什么解决的方法吗? > > > > > > > > > > > > > > > > > > 在 2020-07-08 16:07:17,"Jingsong Li" 写道: > >Hi, > > > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > > > >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" > >并没有真正的物理节点。你不用再调用了。 > > > >Best, > >Jingsong > > > >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach wrote: > > > >> > >> > >> > >> 代码结构改成这样的了: > >> > >> > >> > >> > >> val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> > >> val blinkEnvSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> > >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, > >> blinkEnvSettings) > >> > >> > >> > >> > >> > >> streamExecutionEnv.execute("from kafka sink hbase") > >> > >> > >> > >> > >> 还是报一样的错 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-08 15:40:41,"夏帅" 写道: > >> >你好, > >> >可以看看你的代码结构是不是以下这种 > >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > >> >val bsSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build > >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > >> > .. > >> >tableEnv.execute("") > >> >如果是的话,可以尝试使用bsEnv.execute("") > >> >1.11对于两者的execute代码实现有改动 > >> > > >> > > >> >-- > >> >发件人:Zhou Zach > >> >发送时间:2020年7月8日(星期三) 15:30 > >> >收件人:Flink user-zh mailing list > >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming > topology > >> > > >> >代码在flink > >> > 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: > >> >Exception in thread "main" java.lang.IllegalStateException: No > operators > >> defined in streaming topology. Cannot generate StreamGraph. > >> >at > >> > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > >> >at > >> > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > >> >at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) > >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) > >> > > >> > > >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 > >> > > >> > > >> > > >> > > >> >query: > >> >streamTableEnv.executeSql( > >> > """ > >> >| > >> >|CREATE TABLE `user` ( > >> >|uid BIGINT, > >> >|sex VARCHAR, > >> >|age INT, > >> >|created_time TIMESTAMP(3), > >> >|WATERMARK FOR created_time as created_time - INTERVAL '3' > >> SECOND > >> >|) WITH ( > >> >|'connector.type' = 'kafka', > >> >|'connector.version' = 'universal', > >> >|-- 'connector.topic' = 'user', > >> >|'connector.topic' = 'user_long', > >> >|'connector.startup-mode' = 'latest-offset', > >> >|'connector.properties.group.id' = 'user_flink', > >> >|'format.type' = 'json', > >> >|'format.derive-schema' = 'true' > >> >|) > >> >|""".stripMargin) > >> > > >> > > >> > > >> > > >> > > >> > > >> >streamTableEnv.executeSql( > >> > """ > >> >| > >> >|CREATE TABLE user_hbase3( > >> >|rowkey BIGINT, > >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR) > >> >|) WITH ( > >> >|'connector.type' = 'hbase', > >> >|'connector.version' = '2.1.0', > >> >|'connector.table-name' = 'user_hbase2', > >> >|'connector.zookeeper.znode.parent' = '/hbase', > >> >|'connector.write.buffer-flush.max-size' = '10mb', > >> >|'connector.write.buffer-flush.max-rows' = '1000', > >> >|'connector.write.buffer-flush.interval' = '2s' > >> >|) > >> >|""".stripMargin) > >> > > >> > > >> >streamTableEnv.executeSql( > >> > """ > >> >| > >> >|insert into user_hbase3 > >> >|SELECT uid, > >> >| > >> >| ROW(sex, age, created_time ) as cf > >> >| FROM (select uid,sex,age, cast(created_time as VARCHAR) as > >> created_time from `user`) > >> >| > >> >|""".stripMargin) > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > > > > >-- > >Best, Jingsong Lee >
Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Hi, 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。 请问有什么解决的方法吗? 在 2020-07-08 16:07:17,"Jingsong Li" 写道: >Hi, > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" >并没有真正的物理节点。你不用再调用了。 > >Best, >Jingsong > >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach wrote: > >> >> >> >> 代码结构改成这样的了: >> >> >> >> >> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >> val blinkEnvSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, >> blinkEnvSettings) >> >> >> >> >> >> streamExecutionEnv.execute("from kafka sink hbase") >> >> >> >> >> 还是报一样的错 >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-08 15:40:41,"夏帅" 写道: >> >你好, >> >可以看看你的代码结构是不是以下这种 >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >val bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) >> > .. >> >tableEnv.execute("") >> >如果是的话,可以尝试使用bsEnv.execute("") >> >1.11对于两者的execute代码实现有改动 >> > >> > >> >-- >> >发件人:Zhou Zach >> >发送时间:2020年7月8日(星期三) 15:30 >> >收件人:Flink user-zh mailing list >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology >> > >> >代码在flink >> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: >> >Exception in thread "main" java.lang.IllegalStateException: No operators >> defined in streaming topology. Cannot generate StreamGraph. >> >at >> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) >> >at >> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) >> >at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) >> > >> > >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 >> > >> > >> > >> > >> >query: >> >streamTableEnv.executeSql( >> > """ >> >| >> >|CREATE TABLE `user` ( >> >|uid BIGINT, >> >|sex VARCHAR, >> >|age INT, >> >|created_time TIMESTAMP(3), >> >|WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> >|) WITH ( >> >|'connector.type' = 'kafka', >> >|'connector.version' = 'universal', >> >|-- 'connector.topic' = 'user', >> >|'connector.topic' = 'user_long', >> >|'connector.startup-mode' = 'latest-offset', >> >|'connector.properties.group.id' = 'user_flink', >> >|'format.type' = 'json', >> >|'format.derive-schema' = 'true' >> >|) >> >|""".stripMargin) >> > >> > >> > >> > >> > >> > >> >streamTableEnv.executeSql( >> > """ >> >| >> >|CREATE TABLE user_hbase3( >> >|rowkey BIGINT, >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR) >> >|) WITH ( >> >|'connector.type' = 'hbase', >> >|'connector.version' = '2.1.0', >> >|'connector.table-name' = 'user_hbase2', >> >|'connector.zookeeper.znode.parent' = '/hbase', >> >|'connector.write.buffer-flush.max-size' = '10mb', >> >|'connector.write.buffer-flush.max-rows' = '1000', >> >|'connector.write.buffer-flush.interval' = '2s' >> >|) >> >|""".stripMargin) >> > >> > >> >streamTableEnv.executeSql( >> > """ >> >| >> >|insert into user_hbase3 >> >|SELECT uid, >> >| >> >| ROW(sex, age, created_time ) as cf >> >| FROM (select uid,sex,age, cast(created_time as VARCHAR) as >> created_time from `user`) >> >| >> >|""".stripMargin) >> > >> > >> > >> > >> > >> > >> > >> > >> > > >-- >Best, Jingsong Lee
Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
这个问题的已经有一个issue:https://issues.apache.org/jira/browse/FLINK-18545,请关注 WeiXubin <18925434...@163.com> 于2020年7月23日周四 下午6:00写道: > Hi, > 我想请问下使用 streamExecutionEnv.execute("from kafka sink > hbase"),通过这种方式可以给Job指定名称。 > 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。 > 请问有什么解决方案吗?谢谢 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Hi, 我想请问下使用 streamExecutionEnv.execute("from kafka sink hbase"),通过这种方式可以给Job指定名称。 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。 请问有什么解决方案吗?谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
感谢
Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
1.11 对 StreamTableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 的执行方式有所调整, 简单概述为: 1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业; 2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业; 3. 新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业 (异步提交作业),不需要再调用 StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute() 详细可以参考 [1] [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset Best, Godfrey Zhou Zach 于2020年7月8日周三 下午4:19写道: > 去掉就好了,感谢解答 > > > > > > > > > > > > > > > > > > 在 2020-07-08 16:07:17,"Jingsong Li" 写道: > >Hi, > > > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > > > >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" > >并没有真正的物理节点。你不用再调用了。 > > > >Best, > >Jingsong > > > >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach wrote: > > > >> > >> > >> > >> 代码结构改成这样的了: > >> > >> > >> > >> > >> val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> > >> val blinkEnvSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> > >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, > >> blinkEnvSettings) > >> > >> > >> > >> > >> > >> streamExecutionEnv.execute("from kafka sink hbase") > >> > >> > >> > >> > >> 还是报一样的错 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-08 15:40:41,"夏帅" 写道: > >> >你好, > >> >可以看看你的代码结构是不是以下这种 > >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > >> >val bsSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build > >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > >> > .. > >> >tableEnv.execute("") > >> >如果是的话,可以尝试使用bsEnv.execute("") > >> >1.11对于两者的execute代码实现有改动 > >> > > >> > > >> >-- > >> >发件人:Zhou Zach > >> >发送时间:2020年7月8日(星期三) 15:30 > >> >收件人:Flink user-zh mailing list > >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming > topology > >> > > >> >代码在flink > >> > 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: > >> >Exception in thread "main" java.lang.IllegalStateException: No > operators > >> defined in streaming topology. Cannot generate StreamGraph. > >> >at > >> > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > >> >at > >> > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > >> >at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) > >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) > >> > > >> > > >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 > >> > > >> > > >> > > >> > > >> >query: > >> >streamTableEnv.executeSql( > >> > """ > >> >| > >> >|CREATE TABLE `user` ( > >> >|uid BIGINT, > >> >|sex VARCHAR, > >> >|age INT, > >> >|created_time TIMESTAMP(3), > >> >|WATERMARK FOR created_time as created_time - INTERVAL '3' > >> SECOND > >> >|) WITH ( > >> >|'connector.type' = 'kafka', > >> >|'connector.version' = 'universal', > >> >|-- 'connector.topic' = 'user', > >> >|'connector.topic' = 'user_long', > >> >|'connector.startup-mode' = 'latest-offset', > >> >|'connector.properties.group.id' = 'user_flink', > >> >|'format.type' = 'json', > >> >|'format.derive-schema' = 'true' > >> >|) > >> >|""".stripMargin) > >> > > >> > > >> > > >> > > >> > > >> > > >> >streamTableEnv.executeSql( > >> > """ > >> >| > >> >|CREATE TABLE user_hbase3( > >> >|rowkey BIGINT, > >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR) > >> >|) WITH ( > >> >|'connector.type' = 'hbase', > >> >|'connector.version' = '2.1.0', > >> >|'connector.table-name' = 'user_hbase2', > >> >|'connector.zookeeper.znode.parent' = '/hbase', > >> >|'connector.write.buffer-flush.max-size' = '10mb', > >> >|'connector.write.buffer-flush.max-rows' = '1000', > >> >|'connector.write.buffer-flush.interval' = '2s' > >> >|) > >> >|""".stripMargin) > >> > > >> > > >> >streamTableEnv.executeSql( > >> > """ > >> >| > >> >|insert into user_hbase3 > >> >|SELECT uid, > >> >| > >> >| ROW(sex, age, created_time ) as cf > >> >| FROM
Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
去掉就好了,感谢解答 在 2020-07-08 16:07:17,"Jingsong Li" 写道: >Hi, > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" >并没有真正的物理节点。你不用再调用了。 > >Best, >Jingsong > >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach wrote: > >> >> >> >> 代码结构改成这样的了: >> >> >> >> >> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >> val blinkEnvSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, >> blinkEnvSettings) >> >> >> >> >> >> streamExecutionEnv.execute("from kafka sink hbase") >> >> >> >> >> 还是报一样的错 >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-08 15:40:41,"夏帅" 写道: >> >你好, >> >可以看看你的代码结构是不是以下这种 >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >val bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) >> > .. >> >tableEnv.execute("") >> >如果是的话,可以尝试使用bsEnv.execute("") >> >1.11对于两者的execute代码实现有改动 >> > >> > >> >-- >> >发件人:Zhou Zach >> >发送时间:2020年7月8日(星期三) 15:30 >> >收件人:Flink user-zh mailing list >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology >> > >> >代码在flink >> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: >> >Exception in thread "main" java.lang.IllegalStateException: No operators >> defined in streaming topology. Cannot generate StreamGraph. >> >at >> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) >> >at >> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) >> >at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) >> > >> > >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 >> > >> > >> > >> > >> >query: >> >streamTableEnv.executeSql( >> > """ >> >| >> >|CREATE TABLE `user` ( >> >|uid BIGINT, >> >|sex VARCHAR, >> >|age INT, >> >|created_time TIMESTAMP(3), >> >|WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> >|) WITH ( >> >|'connector.type' = 'kafka', >> >|'connector.version' = 'universal', >> >|-- 'connector.topic' = 'user', >> >|'connector.topic' = 'user_long', >> >|'connector.startup-mode' = 'latest-offset', >> >|'connector.properties.group.id' = 'user_flink', >> >|'format.type' = 'json', >> >|'format.derive-schema' = 'true' >> >|) >> >|""".stripMargin) >> > >> > >> > >> > >> > >> > >> >streamTableEnv.executeSql( >> > """ >> >| >> >|CREATE TABLE user_hbase3( >> >|rowkey BIGINT, >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR) >> >|) WITH ( >> >|'connector.type' = 'hbase', >> >|'connector.version' = '2.1.0', >> >|'connector.table-name' = 'user_hbase2', >> >|'connector.zookeeper.znode.parent' = '/hbase', >> >|'connector.write.buffer-flush.max-size' = '10mb', >> >|'connector.write.buffer-flush.max-rows' = '1000', >> >|'connector.write.buffer-flush.interval' = '2s' >> >|) >> >|""".stripMargin) >> > >> > >> >streamTableEnv.executeSql( >> > """ >> >| >> >|insert into user_hbase3 >> >|SELECT uid, >> >| >> >| ROW(sex, age, created_time ) as cf >> >| FROM (select uid,sex,age, cast(created_time as VARCHAR) as >> created_time from `user`) >> >| >> >|""".stripMargin) >> > >> > >> > >> > >> > >> > >> > >> > >> > > >-- >Best, Jingsong Lee
Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Hi, 你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" 并没有真正的物理节点。你不用再调用了。 Best, Jingsong On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach wrote: > > > > 代码结构改成这样的了: > > > > > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > > val blinkEnvSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, > blinkEnvSettings) > > > > > > streamExecutionEnv.execute("from kafka sink hbase") > > > > > 还是报一样的错 > > > > > > > > > > > > 在 2020-07-08 15:40:41,"夏帅" 写道: > >你好, > >可以看看你的代码结构是不是以下这种 > >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > >val bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build > >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > > .. > >tableEnv.execute("") > >如果是的话,可以尝试使用bsEnv.execute("") > >1.11对于两者的execute代码实现有改动 > > > > > >-- > >发件人:Zhou Zach > >发送时间:2020年7月8日(星期三) 15:30 > >收件人:Flink user-zh mailing list > >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology > > > >代码在flink > 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: > >Exception in thread "main" java.lang.IllegalStateException: No operators > defined in streaming topology. Cannot generate StreamGraph. > >at > org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) > >at > org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) > >at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) > >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) > > > > > >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 > > > > > > > > > >query: > >streamTableEnv.executeSql( > > """ > >| > >|CREATE TABLE `user` ( > >|uid BIGINT, > >|sex VARCHAR, > >|age INT, > >|created_time TIMESTAMP(3), > >|WATERMARK FOR created_time as created_time - INTERVAL '3' > SECOND > >|) WITH ( > >|'connector.type' = 'kafka', > >|'connector.version' = 'universal', > >|-- 'connector.topic' = 'user', > >|'connector.topic' = 'user_long', > >|'connector.startup-mode' = 'latest-offset', > >|'connector.properties.group.id' = 'user_flink', > >|'format.type' = 'json', > >|'format.derive-schema' = 'true' > >|) > >|""".stripMargin) > > > > > > > > > > > > > >streamTableEnv.executeSql( > > """ > >| > >|CREATE TABLE user_hbase3( > >|rowkey BIGINT, > >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR) > >|) WITH ( > >|'connector.type' = 'hbase', > >|'connector.version' = '2.1.0', > >|'connector.table-name' = 'user_hbase2', > >|'connector.zookeeper.znode.parent' = '/hbase', > >|'connector.write.buffer-flush.max-size' = '10mb', > >|'connector.write.buffer-flush.max-rows' = '1000', > >|'connector.write.buffer-flush.interval' = '2s' > >|) > >|""".stripMargin) > > > > > >streamTableEnv.executeSql( > > """ > >| > >|insert into user_hbase3 > >|SELECT uid, > >| > >| ROW(sex, age, created_time ) as cf > >| FROM (select uid,sex,age, cast(created_time as VARCHAR) as > created_time from `user`) > >| > >|""".stripMargin) > > > > > > > > > > > > > > > > > -- Best, Jingsong Lee
Re:回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
代码结构改成这样的了: val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) streamExecutionEnv.execute("from kafka sink hbase") 还是报一样的错 在 2020-07-08 15:40:41,"夏帅" 写道: >你好, >可以看看你的代码结构是不是以下这种 >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >val bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > .. >tableEnv.execute("") >如果是的话,可以尝试使用bsEnv.execute("") >1.11对于两者的execute代码实现有改动 > > >-- >发件人:Zhou Zach >发送时间:2020年7月8日(星期三) 15:30 >收件人:Flink user-zh mailing list >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology > >代码在flink >1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: >Exception in thread "main" java.lang.IllegalStateException: No operators >defined in streaming topology. Cannot generate StreamGraph. >at >org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) >at >org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) >at >org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) > > >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 > > > > >query: >streamTableEnv.executeSql( > """ >| >|CREATE TABLE `user` ( >|uid BIGINT, >|sex VARCHAR, >|age INT, >|created_time TIMESTAMP(3), >|WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >|) WITH ( >|'connector.type' = 'kafka', >|'connector.version' = 'universal', >|-- 'connector.topic' = 'user', >|'connector.topic' = 'user_long', >|'connector.startup-mode' = 'latest-offset', >|'connector.properties.group.id' = 'user_flink', >|'format.type' = 'json', >|'format.derive-schema' = 'true' >|) >|""".stripMargin) > > > > > > >streamTableEnv.executeSql( > """ >| >|CREATE TABLE user_hbase3( >|rowkey BIGINT, >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR) >|) WITH ( >|'connector.type' = 'hbase', >|'connector.version' = '2.1.0', >|'connector.table-name' = 'user_hbase2', >|'connector.zookeeper.znode.parent' = '/hbase', >|'connector.write.buffer-flush.max-size' = '10mb', >|'connector.write.buffer-flush.max-rows' = '1000', >|'connector.write.buffer-flush.interval' = '2s' >|) >|""".stripMargin) > > >streamTableEnv.executeSql( > """ >| >|insert into user_hbase3 >|SELECT uid, >| >| ROW(sex, age, created_time ) as cf >| FROM (select uid,sex,age, cast(created_time as VARCHAR) as > created_time from `user`) >| >|""".stripMargin) > > > > > > > >
回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
你好, 可以看看你的代码结构是不是以下这种 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) .. tableEnv.execute("") 如果是的话,可以尝试使用bsEnv.execute("") 1.11对于两者的execute代码实现有改动 -- 发件人:Zhou Zach 发送时间:2020年7月8日(星期三) 15:30 收件人:Flink user-zh mailing list 主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology 代码在flink 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) 但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 query: streamTableEnv.executeSql( """ | |CREATE TABLE `user` ( |uid BIGINT, |sex VARCHAR, |age INT, |created_time TIMESTAMP(3), |WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( |'connector.type' = 'kafka', |'connector.version' = 'universal', |-- 'connector.topic' = 'user', |'connector.topic' = 'user_long', |'connector.startup-mode' = 'latest-offset', |'connector.properties.group.id' = 'user_flink', |'format.type' = 'json', |'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.executeSql( """ | |CREATE TABLE user_hbase3( |rowkey BIGINT, |cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |) WITH ( |'connector.type' = 'hbase', |'connector.version' = '2.1.0', |'connector.table-name' = 'user_hbase2', |'connector.zookeeper.znode.parent' = '/hbase', |'connector.write.buffer-flush.max-size' = '10mb', |'connector.write.buffer-flush.max-rows' = '1000', |'connector.write.buffer-flush.interval' = '2s' |) |""".stripMargin) streamTableEnv.executeSql( """ | |insert into user_hbase3 |SELECT uid, | | ROW(sex, age, created_time ) as cf | FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from `user`) | |""".stripMargin)