Hi, 可能要魔改一下Upsert Kafka 
Sink,在写入时增加你的逻辑,因为在写入sink的时候,是可以感知到每条消息的RowKind的,用RowData#getRowKind即可。




[1] 
https://github.com/apache/flink-connector-kafka/blob/7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L156
 




--

    Best!
    Xuyang





在 2025-02-27 23:44:03,"casel.chen" <casel_c...@126.com> 写道:
>场景是上游mysql binlog通过canal json格式实时同步进kafka topic,通过flink sql作业写到下游的upsert 
>kafka,针对delete事件做了一个逻辑删除转换。即新增一个 is_delete 标记字段,将原本retract类型消息转换成append 
>only类型消息。请问这个能用flink sql来实现么?如果能的话可否给个例子

回复