Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-11 文章 me
1.flink 版本是1.11.1
streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamBlinkSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
streamTableEnv = StreamTableEnvironment.create(streamEnv, streamBlinkSettings)
2.我在执行sql后需要转为datastream所以最后使用的是dataStreamEnv.execute("SqlPlatformRealTime”)
sql的结果Table会转为datastream然后addSink保存到kafka中。


 原始邮件 
发件人: silence
收件人: user-zh
发送时间: 2020年9月11日(周五) 18:49
主题: Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. 
Cannot execute.


没有insert语句也就是没有sink无法触发计算 -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-11 文章 silence
没有insert语句也就是没有sink无法触发计算



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-11 文章 引领




你应该用的是最新的版本,flink1.10 之后已经改了操作方式,

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();







EnvironmentSettings settings = EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

tEnv.executeSql(“”)

就OK



在2020年09月11日 17:58,me 写道:
flink sql执行sql语句
SELECT kafka_table.src_ip AS kafka_table_src_ip,COUNT(kafka_table.dest_ip) AS 
COUNT_kafka_table_dest_ip_  FROM kafka_table GROUP BY kafka_table.src_ip
直接我发运行,我的初始化环境是
初始化 dataStreamEnv  
初始化 tableEnv
1.执行sql
2.执行sql的结果转为datastream
dataStreamEnv.execute("SqlPlatformRealTime")


Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)

flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-11 文章 me
flink sql执行sql语句
SELECT kafka_table.src_ip AS kafka_table_src_ip,COUNT(kafka_table.dest_ip) AS 
COUNT_kafka_table_dest_ip_  FROM kafka_table GROUP BY kafka_table.src_ip
直接我发运行,我的初始化环境是
初始化 dataStreamEnv  
初始化 tableEnv 
1.执行sql
2.执行sql的结果转为datastream
dataStreamEnv.execute("SqlPlatformRealTime")


Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)

关于flink cdc 测试时遇到的几种问题,比较疑惑,各位大佬求帮助

2020-09-11 文章 引领


1、在checkpoint后,用ck恢复时报错。
org.apache.kafka.connect.errors.ConnectException: 
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
 Failed to deserialize data ofEventHeaderV4{timestamp=1599815908000, 
eventType=EXT_UPDATE_ROWS, serverId=501, headerLength=19, dataLength=25879, 
nextPosition=721073164, flags=0}
2、关于flink cdc读取数据后,并执行join【加载维表的操作】后,写入mysql中。并发调不上去,一直是1
我已在配置文件中做了相应的设置,包括sql-client中
taskmanager.numberOfTaskSlots: 5 # The parallelism used for programs 
that did not specify and other parallelism. parallelism.default: 5


我的sql是:


Insert into orders Select * from order o join sku s FOR SYSTEM_TIME as of 
o.proc_time s  on o.sku_id = s.id


提前感谢各位大佬回复







Re: Flink 1.10 on Yarn

2020-09-11 文章 xuhaiLong
 @Congxian Qiu Sorry,刚看到。

之前使用的 flink 1.7,没有出现过这个问题。升级到 flink 1.10 后这个问题必现,但是时间不定。


On 8/9/2020 15:00,Congxian Qiu wrote:
Hi xuhaiLong
请问你这个作业在这个版本是是必然出现 NPE 问题吗?另外 1.10 之前的版本有出现过这个问题吗?
Best,
Congxian


xuhaiLong  于2020年8月7日周五 下午3:14写道:

感谢回复!我这边的确是这个bug 引起的


On 8/7/2020 13:43,chenkaibit wrote:
hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。
你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint
nullpointer,可以把jdk升级下版本试一下
https://issues.apache.org/jira/browse/FLINK-18196
https://issues.apache.org/jira/browse/FLINK-17479




在 2020-08-07 12:50:23,"xuhaiLong"  写道:

sorry,我添加错附件了


是的,taskmanager.memory.jvm-metaspace.size 为默认配置
On 8/7/2020 11:43,Yangze Guo wrote:
日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么?

Best,
Yangze Guo

On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong  wrote:



Hi


场景:1 tm 三个slot,run了三个job


三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现
`java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
should be increased. If the error persists (usually in cluster after
several job (re-)submissions) then there is probably a class loading leak
which has to be investigated and fixed. The task executor has to be
shutdown...
`


附件为部分异常信息


疑问:
1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?


感谢~~~
从网易邮箱大师发来的云附件
08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
下载



localtimestamp??current_timestamp????mysql????????

2020-09-11 文章 xuzh
1??timestampvarchar??
2??
2020-09-09 15:25:55.416

local_dtm   
   | curr_dtm   
   | local_dtm_no_zone 
| curr_dtm_no_zone
  |
  | 
  | 
  | 
  |
2020-09-09 02:25:55.416 | 2020-09-08 18:25:55.416 | 2020-09-09 
02:25:55.416 | 2020-09-08 18:25:55.416 |



CREATE TABLE `sink2` (
 `local_dtm` varchar(100) DEFAULT NULL,
 `curr_dtm` varchar(100) DEFAULT NULL,
 `local_dtm_no_zone` varchar(100) DEFAULT NULL,
 `curr_dtm_no_zone` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8


drop table if exists sk;
CREATE TABLE sk (   
local_dtm TIMESTAMP,
curr_dtm TIMESTAMP,
local_dtm_no_zone  TIMESTAMP WITHOUT TIME ZONE,
curr_dtm_no_zone  TIMESTAMP WITHOUT TIME ZONE
)
WITH (
  'connector' = 'jdbc',
  'url' = 
'jdbc:mysql://10.12.5.37:3306/rs_report?useUnicode=trueamp;characterEncoding=UTF-8',
  'table-name' = 'sink2',
  'driver' = 'com.mysql.jdbc.Driver',
  'username' = 'dps',
  'password' = 'dps1234'
);


insert into sk values(localtimestamp,current_timestamp,localtimestamp   
,current_timestamp);


2020-09-09 15:25:55.416 

local_dtm   
   | curr_dtm   
   | local_dtm_no_zone 
| curr_dtm_no_zone
  |
  | 
  | 
  | 
  |
2020-09-09 02:25:55.416 | 2020-09-08 18:25:55.416 | 2020-09-09 
02:25:55.416 | 2020-09-08 18:25:55.416 |


----
??: 
   "xuzh"   
 <443435...@qq.com;
:2020??9??10??(??) 10:01
??:"xbjtdcq"

??????localtimestamp??current_timestamp????mysql????????

2020-09-11 文章 xuzh
1??timestampvarchar??
2??
2020-09-0915:25:55.416

local_dtm   
   | curr_dtm   
   | local_dtm_no_zone 
| curr_dtm_no_zone
  |
  | 
  | 
  | 
  |
2020-09-0902:25:55.416 |2020-09-0818:25:55.416 
|2020-09-0902:25:55.416 |2020-09-0818:25:55.416 |






----
??: 
   "xuzh"   
 


flink 动态分区策略

2020-09-11 文章 venn
各位大佬,在执行flink 流任务的时候,经常会出现,某几台服务器的 CPU比较高(共
用集群,还有很多其他组件),导致在这些机器上的算子的延迟远远高于其他机器上的
算子,

请 flink 是否有动态分区策略或者 Taskmanager 迁移策略,可以完成类似于 spark
在算子执行很慢的情况下,master 会起一个一样的算子,如果后起的算子先完成任
务,任务也可见继续往下游执行。

感谢各位大佬



Flink sql权限

2020-09-11 文章 163

请问,flink sql支持元数据的权限校验吗?例如使用hive catalog时,支持hive的权限检查?如果目前不支持,未来是否会考虑?



Re: Flink 1.5.0 savepoint 失败

2020-09-11 文章 hk__lrzy
代码是不是主动设置过stagebackend的地址呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.10.0 的checkpoint越来越大

2020-09-11 文章 hk__lrzy
状态每次有做过清理么。还是在原始基础上进行add的呢,可以贴下代码



--
Sent from: http://apache-flink.147419.n8.nabble.com/