Hi Leonard,

> From  my understanding, your case is not a pure deduplication case but
> want to both keep the previous record and current record, thus the
> deduplication query can not satisfy your requirement.
>

Indeed, that's what I came to realise during our discussion on this email
chain. I'm sorry if it caused confusion. I'm still not sure how to express
this requirement in a concise way: "the need to deduplicate but let
previous values come back after a different value has appeared"....


> Keeping last row in Deduplication always produces a changelog stream,
> because we need to retract the previous last value and sent the new last
> value. You could use a connector that supports upsert sink like HBase, JDBC
> or upsert-kafka connector when sink a changelog stream, the kafka connector
> can only accept append-only stream and thus you got the message.
>

That's what I understood indeed. But in my case I really do want to insert
and not upsert.
Just for information: the goal is to be able to historize kafka messages in
real-time. Each message could potentially be splitted to store information
in multiple tables (in my example: name and address would be inserted in 2
different tables), and the history should be kept and enriched with the
ingestion date. The fact that the kafka message can be split to be stored
in multiple tables creates that "deduplication" requirement (in my example
the address could have changed but not the name, and we don't want to add a
record with no business value in the table containing the names). And of
course, a field can be changed twice and as a result have the same value
again, and that's business information we do want to keep.


> The LAG function is used in over window aggregation and should work in
> your case, but unfortunately look like the LAG function does not implements
> correctly, I create an issue[1] to fix this.
>

Thanks a lot! I'll follow the issue.
I would love to try to fix it... but quickly looking at that code, I'm not
sure it's the best way to start contributing. I don't understand what
should be changed in that code, let alone find what generated that code and
how it should be fixed...


In the meantime, I guess the only other option would be the MATCH_RECOGNIZE?
Do you think you help me find what I did wrong in this query:

SELECT *
FROM customers
MATCH_RECOGNIZE (
PARTITION BY client_number
ORDER BY proctime()
MEASURES
B.client_number as client_number,
B.address as address
PATTERN (A* B)
DEFINE
B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
) as T;

I get the following error:
SQL validation failed. Index 0 out of bounds for length 0

Thanks a lot for your help!

Laurent.


>
>
> Best,
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-20405
>
> On Fri, 27 Nov 2020 at 03:28, Leonard Xu <xbjt...@gmail.com> wrote:
>
>> Hi, Laurent
>>
>> Basically, I need to deduplicate, *but only keeping in the deduplication
>> state the latest value of the changed column* to compare with. While
>> here it seems to keep all previous values…
>>
>>
>> You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it
>> will keep last row, I think that’s what you want.
>>
>> BTW, the deduplication has supported event time in 1.12, this will be
>> available soon.
>>
>> Best,
>> Leonard
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
> Rue Emile Francqui, 4
> 1435 Mont-Saint-Guibert
> (T) +32 10 75 02 00
>
>
> *euranova.eu <http://euranova.eu/>*
> *research.euranova.eu* <http://research.euranova.eu/>
>
> ♻ Be green, keep it on the screen
>
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

-- 
♻ Be green, keep it on the screen

Reply via email to