Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-08 文章 jindy_liu
恩,这里有个问题就是,假设我们以离线结果为基准去对比,但离线结果一般天级或小时级,但实时部分可能是秒级的,两个结果在连线环境做比较,也不好去看这个结果有差异的时候,到底实时计算部分有没有问题! 有很多种原因可能会导致这个结果不准确。。。比如flink sql的bug或都流式消息丢失了等等! -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-07 文章 jindy_liu
有没有大佬有思路可以参考下? -- Sent from: http://apache-flink.147419.n8.nabble.com/

自定义表函数 需定义啥参数来接受 flink sql 内置函数COLLECT(DISTINCT t.tag_id) 结果?

2021-01-28 文章 jindy_liu
有个聚合结果 SELECT id, COLLECT(DISTINCT tag_id) AS tagids GROUP id 定义表函数进行处理,想处理tagids @FunctionHint(output = @DataTypeHint("ROW")) public class IdsMatched extends TableFunction { public void eval(Map tagids) { for Integer key : tagids.keySet()) { collect(Row.of(key + 10 ))

flink sql开了TwoStageOptimizedAggregateRule优化,容易jvm heap outofmemory?

2021-01-06 文章 jindy_liu
如上,同一个作业,数据也是相同的,配置差异就是有两阶段聚合,线上作业运行一断时间后 1、开两阶段聚合, 运行一段时间就会core,并且从checkpoint恢复时,必core,作业重启不了。每次都显示jvm heap不足。 2、关闭两阶段聚合,其它内存配置不变,作业运行没问题。 查看线上的core的时候的dump文件,发现一处疑似泄漏的地方。 请问下local agg操作是不是会用到java heap做聚合操作?聚合的数据量没控制好,容易引发内存问题?

Re: Flink cdc connector:数据量较大时,snapshot阶段报错

2021-01-06 文章 jindy_liu
snapshot阶段如果后端处理的慢,就容易反压,反压会造成debezium执行select * from xxx的时候会花较长时间。 这个报错一般是mysql本身的原因。出现通信错误的原因挺复杂的,需要单独看。我的原因比较坑,定位也花了些时间!!!公司DBA会进行sql语句执行时长监控,并kill掉相应的sql,从而造成上述通信异常问题, 还有一些原因比如空闲时间太长了,mysql server也会断开连接。常见的这些是改mysql的配置,社区的jark wu有一些分享配置,mysql-cdc-connector的github上也有分享。比如wait_timeout之类的。 -

flink sql cdc流(1.11.2),双流join长时间运行后,试着取消job后,再从从checkpoint处恢复运行,报outofmemorry, tm的参数配置也没有变过,rocksdb后端。 这个可能原因是啥?运行的时候没报,为啥恢复的时间报了?

2020-12-23 文章 jindy_liu
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3181) at java.util.ArrayList.grow(ArrayList.java:261) at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:235) at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:2

flink sql数据处理时延的测试方法?

2020-12-15 文章 jindy_liu
请问下,在flink sql里,一般用啥方法去衡量一个任务里,一条mysql cdc流从输入flink,走完flink内部算子,到sink外部系统的整体时延? 或者说整个任务的时延? 总说是实时,目前也不知道处理的实时的量级! -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-10 文章 jindy_liu
感谢指正!这里我验证了下你的说法,确实我理解有误了,我以为这个参数write buffer count以及max write buffer是taskmanager所有的slots的。从web ui来看,确实是很多算子都出现了is_stop_write。你的推断是正确的,老的配置参数下,看了下,确实经常出现is_stop_write=1的情况,然后线程就阻塞sleep了。 昨天调整了一版参数:改了下Slot为2,还是6台机器,job并发度设置成12。结果是之前的阻写没有了。跑一晚上10个小时左右,能跑21000W每张表的速度了,并且现在看也没有阻写的情况,硬盘的读写iops与util都很低,

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-10 文章 jindy_liu
补充一个,当我把state.backend.rocksdb.writebuffer.count: 48调小到10的话, jstack来看,从https://spotify.github.io/threaddump-analyzer/分析来看 top类的方法基本都在rocksdb的io上了。并且很多线程都在等待 -- Sent

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 jindy_liu
flink sql主要涉及到9张mysql表(snapshot+cdc),任务的解析后的算子较多,大概60~70个,但主要是join,和4~5个GroupAggregate算子,最后sink,sink不是瓶颈已经排除。 恩,已经调过几版参数了我的机型的配置是一样的,12核+24G内存 + ssd 50G,共6台(任务并行度设置为60,除去了flink mysql cdc流的并行度为1,其它算子并行度都为60) taskmgr的flink-conf主要参数为,其它为默认: taskmanager.numberOfTaskSlots: 10 taskmanager.memory.process

关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-08 文章 jindy_liu
场景上: 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。 目前测试了一版本flink sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。 所以产生以下想法,不

flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2020-11-30 文章 jindy_liu
flink 版本: 1.11.2 * Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[64_40108_0_1]: version conflict, required seqNo [95958], primary term [1]. current document has seqNo [99881]

Flink cdc connector:数据量较大时,snapshot阶段报错

2020-11-27 文章 jindy_liu
信息如下: 2020-11-27 18:52:01,040 INFO io.debezium.connector.mysql.SnapshotReader [] - Step 7: - 291 of 1311846828 rows scanned from table 'tag.tag_taggedinfo' after 00:16:38.994 2020-11-27 18:52:01,088 INFO io.debezium.connector.mysql.SnapshotReader [] - St

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-27 文章 jindy_liu
谢谢jark!这几天一直在做性能调优! 1、这里针对这个简单场景目前可以在sink表的test_status表的primary key,增加一个join key。即id和status两个列作为key,这样能使用数据最终一致,算是做了下规避,能一致。复杂点的语句感觉有点难搞,有点不敢用,主要不清楚这个乱序会对其它算子有什么影响,很容易出错,确实应该在flink框架里搞了合适些。这里jark在使用flink sql cdc方面有啥建议吗? 2、关于性能这块,确实flink的rocksdb默认参数,性能很差! 按你给的文章,调了些参数,同时换了ssd硬盘后,write_buff

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 文章 jindy_liu
很感谢jark! 1、昨天将status表设置成时态表(Temporal Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。 同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。 2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。 3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test, status)的数据完全对称,没有倾斜问题(200w,

Re: Flink cdc 多表关联处理延迟很大

2020-11-18 文章 jindy_liu
借宝地,我们的场景很像,cdc流与cdc的join,打成宽表。 我也遇到过这种问题,cdc流与cdc的join, 当数据量大的时候,容易出问题(我的checkpoint设置的时间是2小时,超时时间只设置成了10分钟,失败次数设置得超大,超时时长太长,设置成2小时,基本上数据都流不动) 1、snapshot 的时候,老是会有i/o问题。flink侧的日志就是这样的。 ./flink--taskexecutor-0-flink-taskmanager-v1-11-2-fcf8f675f-gn8q8.log.5:146619:2020-11-14 00:19:53,578 ERROR io.d

Re: Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题

2020-11-16 文章 jindy_liu
我也遇到这种乱序问题,楼主,你那边在sql层面解决了么? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-16 文章 jindy_liu
1、试了下 在test表中增加一个proctime CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, `proctime` AS PROCTIME(), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'no_lock', 'password' = 'n

Re: Flink sql 无法用!=

2020-11-15 文章 jindy_liu
用<> -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
哦,这样啊 1、加上一个 deduplicate by sink key 节点在sql中是怎么写的? 2、另外sql 中有关键字能单独指定一条sink sql的并发度吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
图片png格式,怕看不了,我文字补充下: 1、print的最后几行。 32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0) 32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0) 32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0) 32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0) 32> +I(195,jindy195,202

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
怕图片看不清, 我文字补充下: 1、print的最后几行。 32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0) 32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0) 32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0) 32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0) 32> +I(195,jindy195,2020-07-

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
图片是屏幕截图,png格式的。忘记加后缀了。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
我又重试了次,不用重启job也会有问题,就是把并行度大于1会有问题!。 1、直接在sql-client里,启动/data/home/jindyliu/flink-demo/flink-1.11.2/bin/sql-client.sh embedded -d /data/home/jindyliu/flink-demo/flink-1.11.2//conf/sql-client-defaults.yaml sql-client-defaults.yaml的并行度设置为40. 数据一样,其中test表规模是200w条,status表11条。 源表test: CREATE TABLE te

flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-12 文章 jindy_liu
源表test: CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 't

Re: flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-05 文章 jindy_liu
好的,谢谢jark! 数据是有删除的,所以看看要实现下souce方案。本来只想在上层用mapfuction进行一下合并来的,再做转换! 看来还是绕不过sql connector实现。源是kafka,看样子要想办法把kafka的流KafkaDynamicSource想办法改造下!!! -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 文章 jindy_liu
目前有两个DataStream的流,通过mapfunction, 转成DataStream流,请问DataStream怎么转成table,并使用flink sql进行操作。 *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)* *目前我的做法会报错:* StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); DataStreamSource json1 // canal json的格式 DataStrea

flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

2020-08-11 文章 jindy_liu
建表如下: CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka', 'topic'='test', 'properties.group.id'='c_mysql_binlog_es', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='latest-offse

flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-20 文章 jindy_liu
例如: mysql表: CREATE TABLE `test` ( `id` int(11) NOT NULL, `name` varchar(255) NOT NULL, `time` datetime NOT NULL, `status` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `status` ( `id` int(11) NOT NULL, `name` varchar(255) NOT NULL, PRIMARY

Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

2020-07-15 文章 jindy_liu
确实是这行导致的, 如果都重构了,那应该怎么用较好的? 我需要知道每一行对应的是insert, update还是delete事件。 或者问题变一下,对于这种api,一般遵守什么规则,flink的版本兼容性会更好? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

2020-07-14 文章 jindy_liu
代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化? 麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗? at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 jindy_liu
本来还想尽最大可能的复用源码,看了下JdbcOutputFormat的源码实现,batch size是sql语句的个数据,kafka的batch size是字节数,两个协调不好,两个各sink各自的时间阈值也同步不了。 我准备按你的说的方式,用RichFlatMapFunction,里面实现实现一个buffer。 等buffer达阈值或定时时间条件满足时,一次性手动调用JdbcOutputFormat(可以设置更大的buffer值)的writeRecord和flush;不满足的时候,RichFlatMapFunction里不输出元素; 这样kafka的batch sinka节奏应该就不用管

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 jindy_liu
如果可以chain在一起,这个可以保证顺序性,我去试试。 这里再追问下,实际中,如里单流源里的数据也要顺序处理,可以设置并行度为1; 这里可能也要考虑下mysqlSinkFunction里的sink效率问题,需要积累一些数据再sink,这里可以做到跟kafka sink的batch_size和linger.ms配合联动吗?比如满1000条,或超过1s,就同时sink? 谢谢~ -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: Re: Flink 多Sink 数据一致性保证

2020-07-12 文章 jindy_liu
原理大概理解了,想自己实现一个。比如kafka与mysql的实现,并想最大程度的复用些代码。 看了下源码,感觉要把现在的connector(kafka, jdbc)中的代码都看一下,然后扣出来,再去按twophasecommitsinkfunction的实现,重组一些代码,一个个方法实现。 另外问一下,好像现在源码里的jdbc只是at-least-once实现? -- Sent from: http://apache-flink.147419.n8.nabble.com/

单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-10 文章 jindy_liu
请问下,有没有大佬做过类似的事情? 另外,flink side out功能,可以将单流分成多流,但是不是分成多流后,但两条流sink的时候,是不是没法保证sink时候的时序? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: Flink 多Sink 数据一致性保证

2020-07-08 文章 jindy_liu
请问下,你这个最后是怎么做到的,能share下源码吗? 是需要将两个sink合并到一个sink里,然后再实现下二阶段提交吗? 我也遇到个多sink的原子性场景。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-08 文章 jindy_liu
如果用变更时间来有序,可能会有一个问题,是如果数据变更太快了,两条先后数据的时间可能是一样的? 这个问题不知道是不是可以用bin-log的gtid来搞?至少是递增的? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-08 文章 jindy_liu
恩,主要是看flink 的发布里说flink 支持cdc了,感觉这个功能好像是我要的,感觉好像我要做的事情能用flink都搞定。就不用多个开源平台切换与维护多个组件了。 我原本还想先基于flink sql 将数据存量数据先全量导一次异构存储(如hbase, pgsql等)(批量),然后再flink cdc 把mysql的bin-log变化数据搬运到异构存储(如hbase, pgsql等)后(增量),同时再镜像一份cdc后的kafka里的json数据到下游(变化通知)。 那么下游再基于镜像的kafka里的数据(变化)+异构的镜像数据,再基于flink去做一些实时计算的场景需求(比如最近一个

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。 所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。 求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢? 需要写再底层点的api吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
https://github.com/apache/flink/tree/release-1.11.0我看github上tag下已经发布了release-1.11.0,我就编了下tag下的release-1.11.0。最近在做实时计算的一些调研,我们第一步就是要做数据的实时搬运(异构存储),看flink 1.11有cdc功能,我关注了下。看发布了就立即试用了下,看看能不能用你们这个做变化数据的实时同步。1、体验了下,若mysql的binlog按单表有序到kafka,单topic,单分区,flink cdc的同步确实很方便,几条sql语句就搞定了。2、若mysql binlog按db实例,多表

flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序; 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢? 例如mysql实例db1中有表test, statusCREATE TABLE `test` ( `id` int(11) NOT NULL, `name` varchar(255) NOT NULL, `time` datetime NOT NULL, `status` int(11) NOT NULL, PRIMA