场景是上游mysql binlog通过canal json格式实时同步进kafka topic,通过flink sql作业写到下游的upsert 
kafka,针对delete事件做了一个逻辑删除转换。即新增一个 is_delete 标记字段,将原本retract类型消息转换成append 
only类型消息。请问这个能用flink sql来实现么?如果能的话可否给个例子

回复