flink cdc data stream api sourceRecord解析

2021-09-17 文章 Fisher Xiang
Hi,

RT,对于data stream 消费binlog得到的sourceRecord,是一些string类型的struct类型数据,
 请问官方有什么好的办法去解析这些string类型的struct数据吗?使用反射? 目标是解析成Java对象。

附上数据:

SourceRecord{sourcePartition={server=mysql_binlog_source},
sourceOffset={file=mysql-bin.01, pos=720, row=1, snapshot=true}}
ConnectRecord{topic='mysql_binlog_source.flink_cdc.student',
kafkaPartition=null, key=Struct{id=1},
keySchema=Schema{mysql_binlog_source.flink_cdc.student.Key:STRUCT},
value=Struct{after=Struct{id=1,name=fisher,age=28},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=flink_cdc,table=student,server_id=0,file=mysql-bin.01,pos=720,row=0},op=c,ts_ms=1631845080195},
valueSchema=Schema{mysql_binlog_source.flink_cdc.student.Envelope:STRUCT},
timestamp=null, headers=ConnectHeaders(headers=)}


BR
Fisher


Re: flink-connector-postgres-cdc:no changes will be captured 无数据捕获到

2021-09-06 文章 Fisher Xiang
Thank u Leonard.
我把运行日志放在附件了,麻烦看下
使用的用户是:role 'repl' [superuser: true, replication: true, inherit: true,
create role: false, create db: false, can log in: true]


BR
Fisher


On Mon, Sep 6, 2021 at 5:55 PM Leonard Xu  wrote:

> Hello, Fisher
>
> 图挂了,可以用图床工具贴下吗?
> 我可以帮忙看看
>
> 祝好,
> Leonard
>
> > 在 2021年9月6日,17:48,Fisher Xiang  写道:
> >
> > hi,
> >
> > 在使用  flink-connector-postgres-cdc时(版本从1.1.1 ~ 1.4.0都试过), 出现一个警告:
> > WARN io.debezium.relational.RelationalDatabaseSchema - After applying
> the include/exclude list filters, no changes will be captured. Please check
> your configuration!
> > 启动配置是,Starting PostgresConnectorTask with configuration :
> >
> >
> >
> > 然后,往这些表里面(  table.whitelist = stud,data_input,data_output
> )insert和delete记录时,没有捕获到变更数据,只有以下输出,求看是什么问题:
> >
> >
> >
> > 使用的用户是:role 'repl' [superuser: true, replication: true, inherit: true,
> create role: false, create db: false, can log in: true]
> >
> > BR
> > Fisher
>
>


flink-connector-postgres-cdc:no changes will be captured 无数据捕获到

2021-09-06 文章 Fisher Xiang
hi,

在使用  flink-connector-postgres-cdc时(版本从1.1.1 ~ 1.4.0都试过), 出现一个警告:
WARN io.debezium.relational.RelationalDatabaseSchema - After applying the
include/exclude list filters, no changes will be captured. Please check
your configuration!
启动配置是,Starting PostgresConnectorTask with configuration :
[image: image.png]


然后,往这些表里面(  table.whitelist = stud,data_input,data_output
)insert和delete记录时,没有捕获到变更数据,只有以下输出,求看是什么问题:

[image: image.png]

使用的用户是:role 'repl' [superuser: true, replication: true, inherit: true,
create role: false, create db: false, can log in: true]

BR
Fisher


请教union算子union多个source 流时的健壮性如何保证

2021-07-21 文章 Fisher Xiang
请问大家在使用 union算子union多个 stream时,比如 stream1.union(stream2, stream3, … stream
n) ,其中1到n分别来自不同的MQ 集群MQ1, MQ2… MQ n, 当其中几个集群挂掉时,
整个flink 应用都会重启,那么该场景下怎么可以做到 某几条stream 异常挂掉后,而不影响其他流的 union,让整个 flink继续运行呢?

[image: image.png]

BR
Fisher


自定义metrics reporter 如何不通过flink conf来注册并生效

2020-07-23 文章 Fisher Xiang
Hi all,

请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义
customerReporter 如何*能在 代码env里面注册并实现metric上报*,要求不在flink conf.xml 文件里面配置
该customerReporter的信息?

需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即
*适用于多个flink
任务*从而避开重复造轮子。

thx

BR
Fisher