Hi Hemi, One possible way, but it may generate many useless states.
As shown below: ``` CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx, deleted boolean) with (.....); INSERT INTO es_sink SELECT f1, f2, f3 FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY f1, f2 ORDER BY proctime()) as rnk from test_source ) where rnk = 1 AND deleted = false; ``` Best, Feng On Fri, Oct 20, 2023 at 1:38 PM Hemi Grs <hemi...@gmail.com> wrote: > hello everyone, > > right now I'm using flink to sync from mysql to elasticsearch and so far > so good. If we insert, update, or delete it will sync from mysql to elastic > without any problem. > > The problem I have right now is the application is not actually doing hard > delete to the records in mysql, but doing soft delete (updating a deletedAt > column). > > Because it's not actually doing a deletion, flink is not deleting the data > in elastic. How do I make it so it will delete the data in elastic? > > Thanks >