Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.
1.flink 版本是1.11.1 streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamBlinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() streamTableEnv = StreamTableEnvironment.create(streamEnv, streamBlinkSettings) 2.我在执行sql后需要转为datastream所以最后使用的是dataStreamEnv.execute("SqlPlatformRealTime”) sql的结果Table会转为datastream然后addSink保存到kafka中。 原始邮件 发件人: silence 收件人: user-zh 发送时间: 2020年9月11日(周五) 18:49 主题: Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute. 没有insert语句也就是没有sink无法触发计算 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.
没有insert语句也就是没有sink无法触发计算 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.
你应该用的是最新的版本,flink1.10 之后已经改了操作方式, StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); tEnv.executeSql(“”) 就OK 在2020年09月11日 17:58,me 写道: flink sql执行sql语句 SELECT kafka_table.src_ip AS kafka_table_src_ip,COUNT(kafka_table.dest_ip) AS COUNT_kafka_table_dest_ip_ FROM kafka_table GROUP BY kafka_table.src_ip 直接我发运行,我的初始化环境是 初始化 dataStreamEnv 初始化 tableEnv 1.执行sql 2.执行sql的结果转为datastream dataStreamEnv.execute("SqlPlatformRealTime") Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.
flink sql执行sql语句 SELECT kafka_table.src_ip AS kafka_table_src_ip,COUNT(kafka_table.dest_ip) AS COUNT_kafka_table_dest_ip_ FROM kafka_table GROUP BY kafka_table.src_ip 直接我发运行,我的初始化环境是 初始化 dataStreamEnv 初始化 tableEnv 1.执行sql 2.执行sql的结果转为datastream dataStreamEnv.execute("SqlPlatformRealTime") Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
关于flink cdc 测试时遇到的几种问题,比较疑惑,各位大佬求帮助
1、在checkpoint后,用ck恢复时报错。 org.apache.kafka.connect.errors.ConnectException: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data ofEventHeaderV4{timestamp=1599815908000, eventType=EXT_UPDATE_ROWS, serverId=501, headerLength=19, dataLength=25879, nextPosition=721073164, flags=0} 2、关于flink cdc读取数据后,并执行join【加载维表的操作】后,写入mysql中。并发调不上去,一直是1 我已在配置文件中做了相应的设置,包括sql-client中 taskmanager.numberOfTaskSlots: 5 # The parallelism used for programs that did not specify and other parallelism. parallelism.default: 5 我的sql是: Insert into orders Select * from order o join sku s FOR SYSTEM_TIME as of o.proc_time s on o.sku_id = s.id 提前感谢各位大佬回复
Re: Flink 1.10 on Yarn
@Congxian Qiu Sorry,刚看到。 之前使用的 flink 1.7,没有出现过这个问题。升级到 flink 1.10 后这个问题必现,但是时间不定。 On 8/9/2020 15:00,Congxian Qiu wrote: Hi xuhaiLong 请问你这个作业在这个版本是是必然出现 NPE 问题吗?另外 1.10 之前的版本有出现过这个问题吗? Best, Congxian xuhaiLong 于2020年8月7日周五 下午3:14写道: 感谢回复!我这边的确是这个bug 引起的 On 8/7/2020 13:43,chenkaibit wrote: hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。 你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint nullpointer,可以把jdk升级下版本试一下 https://issues.apache.org/jira/browse/FLINK-18196 https://issues.apache.org/jira/browse/FLINK-17479 在 2020-08-07 12:50:23,"xuhaiLong" 写道: sorry,我添加错附件了 是的,taskmanager.memory.jvm-metaspace.size 为默认配置 On 8/7/2020 11:43,Yangze Guo wrote: 日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么? Best, Yangze Guo On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong wrote: Hi 场景:1 tm 三个slot,run了三个job 三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak which has to be investigated and fixed. The task executor has to be shutdown... ` 附件为部分异常信息 疑问: 1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题) 2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启? 感谢~~~ 从网易邮箱大师发来的云附件 08-07error.txt(730.4KB,2020年8月22日 11:37 到期) 下载
localtimestamp??current_timestamp????mysql????????
1??timestampvarchar?? 2?? 2020-09-09 15:25:55.416 local_dtm | curr_dtm | local_dtm_no_zone | curr_dtm_no_zone | | | | | 2020-09-09 02:25:55.416 | 2020-09-08 18:25:55.416 | 2020-09-09 02:25:55.416 | 2020-09-08 18:25:55.416 | CREATE TABLE `sink2` ( `local_dtm` varchar(100) DEFAULT NULL, `curr_dtm` varchar(100) DEFAULT NULL, `local_dtm_no_zone` varchar(100) DEFAULT NULL, `curr_dtm_no_zone` varchar(100) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 drop table if exists sk; CREATE TABLE sk ( local_dtm TIMESTAMP, curr_dtm TIMESTAMP, local_dtm_no_zone TIMESTAMP WITHOUT TIME ZONE, curr_dtm_no_zone TIMESTAMP WITHOUT TIME ZONE ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.12.5.37:3306/rs_report?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'sink2', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'dps', 'password' = 'dps1234' ); insert into sk values(localtimestamp,current_timestamp,localtimestamp ,current_timestamp); 2020-09-09 15:25:55.416 local_dtm | curr_dtm | local_dtm_no_zone | curr_dtm_no_zone | | | | | 2020-09-09 02:25:55.416 | 2020-09-08 18:25:55.416 | 2020-09-09 02:25:55.416 | 2020-09-08 18:25:55.416 | -- -- ??: "xuzh" <443435...@qq.com>; : 2020??9??10??(??) 10:01 ??: "xbjtdcq"
??????localtimestamp??current_timestamp????mysql????????
1??timestampvarchar?? 2?? 2020-09-09 15:25:55.416 local_dtm | curr_dtm | local_dtm_no_zone | curr_dtm_no_zone | | | | | 2020-09-09 02:25:55.416 | 2020-09-08 18:25:55.416 | 2020-09-09 02:25:55.416 | 2020-09-08 18:25:55.416 | -- -- ??: "xuzh"
flink 动态分区策略
各位大佬,在执行flink 流任务的时候,经常会出现,某几台服务器的 CPU比较高(共 用集群,还有很多其他组件),导致在这些机器上的算子的延迟远远高于其他机器上的 算子, 请 flink 是否有动态分区策略或者 Taskmanager 迁移策略,可以完成类似于 spark 在算子执行很慢的情况下,master 会起一个一样的算子,如果后起的算子先完成任 务,任务也可见继续往下游执行。 感谢各位大佬
Flink sql权限
请问,flink sql支持元数据的权限校验吗?例如使用hive catalog时,支持hive的权限检查?如果目前不支持,未来是否会考虑?