flink cdc data stream api sourceRecord解析
Hi, RT,对于data stream 消费binlog得到的sourceRecord,是一些string类型的struct类型数据, 请问官方有什么好的办法去解析这些string类型的struct数据吗?使用反射? 目标是解析成Java对象。 附上数据: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.01, pos=720, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.flink_cdc.student', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink_cdc.student.Key:STRUCT}, value=Struct{after=Struct{id=1,name=fisher,age=28},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=flink_cdc,table=student,server_id=0,file=mysql-bin.01,pos=720,row=0},op=c,ts_ms=1631845080195}, valueSchema=Schema{mysql_binlog_source.flink_cdc.student.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} BR Fisher
Re: flink-connector-postgres-cdc:no changes will be captured 无数据捕获到
Thank u Leonard. 我把运行日志放在附件了,麻烦看下 使用的用户是:role 'repl' [superuser: true, replication: true, inherit: true, create role: false, create db: false, can log in: true] BR Fisher On Mon, Sep 6, 2021 at 5:55 PM Leonard Xu wrote: > Hello, Fisher > > 图挂了,可以用图床工具贴下吗? > 我可以帮忙看看 > > 祝好, > Leonard > > > 在 2021年9月6日,17:48,Fisher Xiang 写道: > > > > hi, > > > > 在使用 flink-connector-postgres-cdc时(版本从1.1.1 ~ 1.4.0都试过), 出现一个警告: > > WARN io.debezium.relational.RelationalDatabaseSchema - After applying > the include/exclude list filters, no changes will be captured. Please check > your configuration! > > 启动配置是,Starting PostgresConnectorTask with configuration : > > > > > > > > 然后,往这些表里面( table.whitelist = stud,data_input,data_output > )insert和delete记录时,没有捕获到变更数据,只有以下输出,求看是什么问题: > > > > > > > > 使用的用户是:role 'repl' [superuser: true, replication: true, inherit: true, > create role: false, create db: false, can log in: true] > > > > BR > > Fisher > >
flink-connector-postgres-cdc:no changes will be captured 无数据捕获到
hi, 在使用 flink-connector-postgres-cdc时(版本从1.1.1 ~ 1.4.0都试过), 出现一个警告: WARN io.debezium.relational.RelationalDatabaseSchema - After applying the include/exclude list filters, no changes will be captured. Please check your configuration! 启动配置是,Starting PostgresConnectorTask with configuration : [image: image.png] 然后,往这些表里面( table.whitelist = stud,data_input,data_output )insert和delete记录时,没有捕获到变更数据,只有以下输出,求看是什么问题: [image: image.png] 使用的用户是:role 'repl' [superuser: true, replication: true, inherit: true, create role: false, create db: false, can log in: true] BR Fisher
请教union算子union多个source 流时的健壮性如何保证
请问大家在使用 union算子union多个 stream时,比如 stream1.union(stream2, stream3, … stream n) ,其中1到n分别来自不同的MQ 集群MQ1, MQ2… MQ n, 当其中几个集群挂掉时, 整个flink 应用都会重启,那么该场景下怎么可以做到 某几条stream 异常挂掉后,而不影响其他流的 union,让整个 flink继续运行呢? [image: image.png] BR Fisher
自定义metrics reporter 如何不通过flink conf来注册并生效
Hi all, 请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义 customerReporter 如何*能在 代码env里面注册并实现metric上报*,要求不在flink conf.xml 文件里面配置 该customerReporter的信息? 需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即 *适用于多个flink 任务*从而避开重复造轮子。 thx BR Fisher