Casel 您好, 如果您的同步逻辑比较简单,也可以尝试下使用 CDC YAML 提供的 Pipeline 进行同步。
这样的 transform 规则可以对上游传来的 DELETE 事件进行转换[1]: ``` source: type: mysql # … sink: type: kafka # ... transform: - source-table: \.*.\.* projection: \*, __data_event_type__ AS op_type # 将原始操作的 op_type 追加到行尾 converter-after-transform: SOFT_DELETE ``` [1] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/docs/core-concept/transform/#converter-after-transform Best Regards, Xiqian 2025年2月27日 23:44,casel.chen <casel_c...@126.com> 写道: 场景是上游mysql binlog通过canal json格式实时同步进kafka topic,通过flink sql作业写到下游的upsert kafka,针对delete事件做了一个逻辑删除转换。即新增一个 is_delete 标记字段,将原本retract类型消息转换成append only类型消息。请问这个能用flink sql来实现么?如果能的话可否给个例子