HI Laurent,

Did you manage to find the error in your MATCH_RECOGNIZE statement? If I
had to take a guess, I'd say it's because you are accessing A, but due to
the quantifier of * there might actually be no event A.

Cheers,

Konstantin



On Fri, Nov 27, 2020 at 10:03 PM Laurent Exsteens <
laurent.exste...@euranova.eu> wrote:

> Hi Leonard,
>
>
>> From  my understanding, your case is not a pure deduplication case but
>> want to both keep the previous record and current record, thus the
>> deduplication query can not satisfy your requirement.
>>
>
> Indeed, that's what I came to realise during our discussion on this email
> chain. I'm sorry if it caused confusion. I'm still not sure how to express
> this requirement in a concise way: "the need to deduplicate but let
> previous values come back after a different value has appeared"....
>
>
>> Keeping last row in Deduplication always produces a changelog stream,
>> because we need to retract the previous last value and sent the new last
>> value. You could use a connector that supports upsert sink like HBase, JDBC
>> or upsert-kafka connector when sink a changelog stream, the kafka connector
>> can only accept append-only stream and thus you got the message.
>>
>
> That's what I understood indeed. But in my case I really do want to insert
> and not upsert.
> Just for information: the goal is to be able to historize kafka messages
> in real-time. Each message could potentially be splitted to store
> information in multiple tables (in my example: name and address would be
> inserted in 2 different tables), and the history should be kept and
> enriched with the ingestion date. The fact that the kafka message can be
> split to be stored in multiple tables creates that "deduplication"
> requirement (in my example the address could have changed but not the name,
> and we don't want to add a record with no business value in the table
> containing the names). And of course, a field can be changed twice and as a
> result have the same value again, and that's business information we do
> want to keep.
>
>
>> The LAG function is used in over window aggregation and should work in
>> your case, but unfortunately look like the LAG function does not implements
>> correctly, I create an issue[1] to fix this.
>>
>
> Thanks a lot! I'll follow the issue.
> I would love to try to fix it... but quickly looking at that code, I'm not
> sure it's the best way to start contributing. I don't understand what
> should be changed in that code, let alone find what generated that code and
> how it should be fixed...
>
>
> In the meantime, I guess the only other option would be the
> MATCH_RECOGNIZE?
> Do you think you help me find what I did wrong in this query:
>
> SELECT *
> FROM customers
> MATCH_RECOGNIZE (
> PARTITION BY client_number
> ORDER BY proctime()
> MEASURES
> B.client_number as client_number,
> B.address as address
> PATTERN (A* B)
> DEFINE
> B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
> ) as T;
>
> I get the following error:
> SQL validation failed. Index 0 out of bounds for length 0
>
> Thanks a lot for your help!
>
> Laurent.
>
>
>>
>>
>> Best,
>> Leonard
>> [1] https://issues.apache.org/jira/browse/FLINK-20405
>>
>> On Fri, 27 Nov 2020 at 03:28, Leonard Xu <xbjt...@gmail.com> wrote:
>>
>>> Hi, Laurent
>>>
>>> Basically, I need to deduplicate, *but only keeping in the
>>> deduplication state the latest value of the changed column* to compare
>>> with. While here it seems to keep all previous values…
>>>
>>>
>>> You can use ` ORDER BY proctime() DESC`  in the deduplication query,
>>>  it will keep last row, I think that’s what you want.
>>>
>>> BTW, the deduplication has supported event time in 1.12, this will be
>>> available soon.
>>>
>>> Best,
>>> Leonard
>>>
>>>
>>
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>>
>> *EURA NOVA*
>> Rue Emile Francqui, 4
>> 1435 Mont-Saint-Guibert
>> (T) +32 10 75 02 00
>>
>>
>> *euranova.eu <http://euranova.eu/>*
>> *research.euranova.eu* <http://research.euranova.eu/>
>>
>> ♻ Be green, keep it on the screen
>>
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu <http://euranova.eu/>*
>
> *research.euranova.eu* <http://research.euranova.eu/>
>
> ♻ Be green, keep it on the screen



-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Reply via email to