Hi Hemi,
One possible way, but it may generate many useless states.
As shown below:
```
CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx, deleted boolean) with
(.....);
INSERT INTO es_sink
SELECT f1, f2, f3
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY f1, f2 ORDER BY proctime()) as rnk
from test_source
) where rnk = 1 AND deleted = false;
```
Best,
Feng
On Fri, Oct 20, 2023 at 1:38 PM Hemi Grs <[email protected]> wrote:
> hello everyone,
>
> right now I'm using flink to sync from mysql to elasticsearch and so far
> so good. If we insert, update, or delete it will sync from mysql to elastic
> without any problem.
>
> The problem I have right now is the application is not actually doing hard
> delete to the records in mysql, but doing soft delete (updating a deletedAt
> column).
>
> Because it's not actually doing a deletion, flink is not deleting the data
> in elastic. How do I make it so it will delete the data in elastic?
>
> Thanks
>