Hi Hemi,

One possible way, but it may generate many useless states.

As shown below:
```
CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx,  deleted boolean) with
(.....);

INSERT INTO es_sink
SELECT f1, f2, f3
FROM (
    SELECT *,
ROW_NUMBER() OVER (PARTITION BY f1, f2   ORDER BY proctime())  as rnk
from test_source
) where rnk = 1 AND deleted = false;
```

Best,
Feng

On Fri, Oct 20, 2023 at 1:38 PM Hemi Grs <hemi...@gmail.com> wrote:

> hello everyone,
>
> right now I'm using flink to sync from mysql to elasticsearch and so far
> so good. If we insert, update, or delete it will sync from mysql to elastic
> without any problem.
>
> The problem I have right now is the application is not actually doing hard
> delete to the records in mysql, but doing soft delete (updating a deletedAt
> column).
>
> Because it's not actually doing a deletion, flink is not deleting the data
> in elastic. How do I make it so it will delete the data in elastic?
>
> Thanks
>

Reply via email to