Re: Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-08 文章 Jane Chan
从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1], 这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性. 另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了. 一些可能的建议如下 1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer 节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op),

Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-05 文章 陈佳豪
hi 早上好 我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下 == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机]) +- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], 名称=[$1],