??????flink sql cdc????kafka????????????????????

2021-04-21 文章 ????
flink-cdcSourceRecord??SourceRecord??topic?? ??Debezium mysql-conectorkafka-connectortopic?? ?? ??+??+topic??  ??

Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-21 文章 Xi Shen
Cache设置大小为2w,超时时间为2h 实际上整个表大小为3w左右,考虑到整个表实际只有十几兆。我会尝试cache size设置为4w,保证整个表都能装进cache里。看会不会好一点 但是我查到现在怀疑跟savepoint有关: - 如果我设置kafka offset=earliest,不带savepoint重启,flink job启动消费时,lag有5000w左右,但是1分钟内就能达到约7k/s的消费速度。如下图,job在14:31启动,前面的速度特别大是因为offset重置,但是在14:33已经达到7.5k的消费速度

Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-21 文章 Xi Shen
读JDBC table是有缓存的,看了源码,是用Guava cache实现 文档上说,整个Task Manager进程共享使用一个Cache,所以应该和广播的效果是一样的?所以应该不是查询TiDB导致的性能问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:flink sql cdc发到kafka消息表名信息缺失问题

2021-04-21 文章 casel.chen
我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。 在 2021-04-22 11:01:22,"飞翔" 写道: 既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka, 比如canal的样例,虽然after 不是很全,你可以自己去构造补全,这样你采用debezium不就好

Re:Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-21 文章 李一飞
明白了,谢谢~ 在 2021-04-21 19:58:23,"Peihui He" 写道: >fetch.min.bytes >fetch.wait.max.ms >还可以用着两个参数控制下的 > >熊云昆 于2021年4月21日周三 下午7:10写道: > >> 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条 >> >> >> | | >> 熊云昆 >> | >> | >> 邮箱:xiongyun...@163.com >> | >> >> 签名由 网易邮箱大师 定制 >> >> 在2021年04月20日 18:19,李一飞 写道:

Re:回复:flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-21 文章 李一飞
谢谢 在 2021-04-21 19:10:17,"熊云昆" 写道: >有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条 > > >| | >熊云昆 >| >| >邮箱:xiongyun...@163.com >| > >签名由 网易邮箱大师 定制 > >在2021年04月20日 18:19,李一飞 写道: >flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置 >最好分流、批场景回答一下,谢谢!

?????? flink sql????kafka join??????????????????????

2021-04-21 文章 ????
Tidb??Tidb??TiDBstructured-streaming?? ?? --  -- ??:

????

2021-04-21 文章 ????

自定义RocketMqSource出现位点重置现象

2021-04-21 文章 zelin jin
大家好,公司内部写的自定义RocektMqSource,会偶现位点前移的现象,偶现时间不定,目前找不出原因。Flink 版本 1.4.2,目前是不会从checkPoint恢复,但是会做checkPoint Source代码如下: public class RocketMQSource extends RichParallelSourceFunction implements CheckpointedFunction,ResultTypeQueryable { public static final int DELAY_MSG_NOT_FOUND = 1

??????flink sql cdc????kafka????????????????????

2021-04-21 文章 ????
??flink??debeziumcanal??kafka, canalafter ??debeziumflink-cdc??debezium??record -

Re: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-21 文章 Xi Shen
为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表 我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟 所以应该是维表JOIN的问题 现在连的数据库是TiDB,连接串属性为 useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 設置look up table source

2021-04-21 文章 HunterXHunter
理论上只要实现了LookupTableSource。你在 TableFunction 里面怎么重写 eval 都可以,不管你是要读取哪里的数据怎么读。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc发到kafka消息表名信息缺失问题

2021-04-21 文章 casel.chen
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢? CREATE TABLE `binlog_ta

flink在yarn集群上启动的问题

2021-04-21 文章 tanggen...@163.com
您好,我在向yarn 集群提交flink任务时遇到了一些问题,希望能帮忙回答一下 我布署了一个三个节点hadoop集群,两个工作节点为4c24G,yarn-site中配置了8个vcore,可用内存为20G,总共是16vcore 40G的资源,现在我向yarn提交了两个任务,分别分配了3vcore,6G内存,共消耗6vcore,12G内存,从hadoop的web ui上也能反映这一点,如下图: 但是当我提交第三个任务时,却无法提交成功,没有明显的报错日志,可是整个集群的资源明显是充足的,所以不知道问题是出现在哪里,还请多多指教 附1(控制台输出): The program finished

Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-21 文章 王 小宝
发自我的iPhone > 在 2021年4月21日,19:58,Peihui He 写道: > > fetch.min.bytes > fetch.wait.max.ms > 还可以用着两个参数控制下的 > > 熊云昆 于2021年4月21日周三 下午7:10写道: > >> 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条 >> >> >> | | >> 熊云昆 >> | >> | >> 邮箱:xiongyun...@163.com >> | >> >> 签名由 网易邮箱大师 定制 >> >> 在2021年04月

Re: 設置look up table source

2021-04-21 文章 Chongaih Hau
Hi Leonard, 好的謝謝你的回覆 Regards, *Hau ChongAih* On Wed, Apr 21, 2021 at 7:27 PM Leonard Xu wrote: > Hi, ChongAih > > 你可以参考 JdbcDynamicTableSource [1] 这个 table source 实现了 LookupTableSource > 接口,你需要写一个类似 JdbcRowDataLookupFunction 即可 > 的函数即可。 > > 祝好, > Leonard > [1] > https://github.com/apache/fl

关于Flink 非blink planer下自持listagg

2021-04-21 文章 张海深
你好,请问Flink 是否支持非blink planer下的 listagg,有计划支持吗。现阶段如果想使用listagg,请问有什么好的方法支持吗

[Flink-1.8.1]POJO????????????????????????????

2021-04-21 文章 Edc
POJO??-?? public class User {     private String name;     private String age;     @Override     public String toString() {         return "User{" +                 "name='" + name + '\'' +                 ", age='" + age + '\'' +                 '}';     }     public User(String

Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-21 文章 Peihui He
fetch.min.bytes fetch.wait.max.ms 还可以用着两个参数控制下的 熊云昆 于2021年4月21日周三 下午7:10写道: > 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条 > > > | | > 熊云昆 > | > | > 邮箱:xiongyun...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2021年04月20日 18:19,李一飞 写道: > flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置 > 最好分流、批场景回答一下,谢谢!

Re: 設置look up table source

2021-04-21 文章 Leonard Xu
Hi, ChongAih 你可以参考 JdbcDynamicTableSource [1] 这个 table source 实现了 LookupTableSource 接口,你需要写一个类似 JdbcRowDataLookupFunction 即可 的函数即可。 祝好, Leonard [1] https://github.com/apache/flink/blob/4be9aff3eccb3808df1f10ef7c30480ec11a9cb0/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/

回复:flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-21 文章 熊云昆
有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条 | | 熊云昆 | | 邮箱:xiongyun...@163.com | 签名由 网易邮箱大师 定制 在2021年04月20日 18:19,李一飞 写道: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置 最好分流、批场景回答一下,谢谢!

Re:回复: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-21 文章 casel.chen
我看了源码,即使改换成debezium json格式输出,也得不到原本debezium json数据,因为输出字段只有有限的3个,没有关键的库表信息。而且看了其他几个cdc格式,都有类似的问题 想知道是为什么?追踪到上游debezium emitRecords方法,参数record就只有rowdata和rowkind信息,没有table和database DebeziumJsonSerializationSchema.java private static RowType createJsonRowType(DataType databaseSchema) { /

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-21 文章 HunterXHunter
在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark -- Sent from: http://apache-flink.147419.n8.nabble.com/

設置look up table source

2021-04-21 文章 Chongaih Hau
hi all, flink在使用temporal join只支持look up table source。我在做單元測試的時候, 下載了hive 表裡面的數據,嘗試了用filesystem註冊temporal table。可是後來發現file system不支持lookup。查詢了文檔( https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/legacySourceSinks.html),用戶可以自定義look up table source。可是我找不到類似用csv設置look up table source的方