Casel 您好,

如果您的同步逻辑比较简单,也可以尝试下使用 CDC YAML 提供的 Pipeline 进行同步。

这样的 transform 规则可以对上游传来的 DELETE 事件进行转换[1]:

```
source:
  type: mysql
  # …

sink:
  type: kafka
  # ...

transform:
  - source-table: \.*.\.*
    projection: \*, __data_event_type__ AS op_type # 将原始操作的 op_type 追加到行尾
    converter-after-transform: SOFT_DELETE
```

[1] 
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/docs/core-concept/transform/#converter-after-transform

Best Regards,
Xiqian

2025年2月27日 23:44,casel.chen <casel_c...@126.com> 写道:

场景是上游mysql binlog通过canal json格式实时同步进kafka topic,通过flink sql作业写到下游的upsert 
kafka,针对delete事件做了一个逻辑删除转换。即新增一个 is_delete 标记字段,将原本retract类型消息转换成append 
only类型消息。请问这个能用flink sql来实现么?如果能的话可否给个例子

回复