Re: FlinkSQL kafka->dedup->kafka

2020-12-15 Thread Konstantin Knauf
HI Laurent,

Did you manage to find the error in your MATCH_RECOGNIZE statement? If I
had to take a guess, I'd say it's because you are accessing A, but due to
the quantifier of * there might actually be no event A.

Cheers,

Konstantin



On Fri, Nov 27, 2020 at 10:03 PM Laurent Exsteens <
laurent.exste...@euranova.eu> wrote:

> Hi Leonard,
>
>
>> From  my understanding, your case is not a pure deduplication case but
>> want to both keep the previous record and current record, thus the
>> deduplication query can not satisfy your requirement.
>>
>
> Indeed, that's what I came to realise during our discussion on this email
> chain. I'm sorry if it caused confusion. I'm still not sure how to express
> this requirement in a concise way: "the need to deduplicate but let
> previous values come back after a different value has appeared"
>
>
>> Keeping last row in Deduplication always produces a changelog stream,
>> because we need to retract the previous last value and sent the new last
>> value. You could use a connector that supports upsert sink like HBase, JDBC
>> or upsert-kafka connector when sink a changelog stream, the kafka connector
>> can only accept append-only stream and thus you got the message.
>>
>
> That's what I understood indeed. But in my case I really do want to insert
> and not upsert.
> Just for information: the goal is to be able to historize kafka messages
> in real-time. Each message could potentially be splitted to store
> information in multiple tables (in my example: name and address would be
> inserted in 2 different tables), and the history should be kept and
> enriched with the ingestion date. The fact that the kafka message can be
> split to be stored in multiple tables creates that "deduplication"
> requirement (in my example the address could have changed but not the name,
> and we don't want to add a record with no business value in the table
> containing the names). And of course, a field can be changed twice and as a
> result have the same value again, and that's business information we do
> want to keep.
>
>
>> The LAG function is used in over window aggregation and should work in
>> your case, but unfortunately look like the LAG function does not implements
>> correctly, I create an issue[1] to fix this.
>>
>
> Thanks a lot! I'll follow the issue.
> I would love to try to fix it... but quickly looking at that code, I'm not
> sure it's the best way to start contributing. I don't understand what
> should be changed in that code, let alone find what generated that code and
> how it should be fixed...
>
>
> In the meantime, I guess the only other option would be the
> MATCH_RECOGNIZE?
> Do you think you help me find what I did wrong in this query:
>
> SELECT *
> FROM customers
> MATCH_RECOGNIZE (
> PARTITION BY client_number
> ORDER BY proctime()
> MEASURES
> B.client_number as client_number,
> B.address as address
> PATTERN (A* B)
> DEFINE
> B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
> ) as T;
>
> I get the following error:
> SQL validation failed. Index 0 out of bounds for length 0
>
> Thanks a lot for your help!
>
> Laurent.
>
>
>>
>>
>> Best,
>> Leonard
>> [1] https://issues.apache.org/jira/browse/FLINK-20405
>>
>> On Fri, 27 Nov 2020 at 03:28, Leonard Xu  wrote:
>>
>>> Hi, Laurent
>>>
>>> Basically, I need to deduplicate, *but only keeping in the
>>> deduplication state the latest value of the changed column* to compare
>>> with. While here it seems to keep all previous values…
>>>
>>>
>>> You can use ` ORDER BY proctime() DESC`  in the deduplication query,
>>>  it will keep last row, I think that’s what you want.
>>>
>>> BTW, the deduplication has supported event time in 1.12, this will be
>>> available soon.
>>>
>>> Best,
>>> Leonard
>>>
>>>
>>
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>>
>> *EURA NOVA*
>> Rue Emile Francqui, 4
>> 1435 Mont-Saint-Guibert
>> (T) +32 10 75 02 00
>>
>>
>> *euranova.eu *
>> *research.euranova.eu* 
>>
>> ♻ Be green, keep it on the screen
>>
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu *
>
> *research.euranova.eu* 
>
> ♻ Be green, keep it on the screen



-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard,


> From  my understanding, your case is not a pure deduplication case but
> want to both keep the previous record and current record, thus the
> deduplication query can not satisfy your requirement.
>

Indeed, that's what I came to realise during our discussion on this email
chain. I'm sorry if it caused confusion. I'm still not sure how to express
this requirement in a concise way: "the need to deduplicate but let
previous values come back after a different value has appeared"


> Keeping last row in Deduplication always produces a changelog stream,
> because we need to retract the previous last value and sent the new last
> value. You could use a connector that supports upsert sink like HBase, JDBC
> or upsert-kafka connector when sink a changelog stream, the kafka connector
> can only accept append-only stream and thus you got the message.
>

That's what I understood indeed. But in my case I really do want to insert
and not upsert.
Just for information: the goal is to be able to historize kafka messages in
real-time. Each message could potentially be splitted to store information
in multiple tables (in my example: name and address would be inserted in 2
different tables), and the history should be kept and enriched with the
ingestion date. The fact that the kafka message can be split to be stored
in multiple tables creates that "deduplication" requirement (in my example
the address could have changed but not the name, and we don't want to add a
record with no business value in the table containing the names). And of
course, a field can be changed twice and as a result have the same value
again, and that's business information we do want to keep.


> The LAG function is used in over window aggregation and should work in
> your case, but unfortunately look like the LAG function does not implements
> correctly, I create an issue[1] to fix this.
>

Thanks a lot! I'll follow the issue.
I would love to try to fix it... but quickly looking at that code, I'm not
sure it's the best way to start contributing. I don't understand what
should be changed in that code, let alone find what generated that code and
how it should be fixed...


In the meantime, I guess the only other option would be the MATCH_RECOGNIZE?
Do you think you help me find what I did wrong in this query:

SELECT *
FROM customers
MATCH_RECOGNIZE (
PARTITION BY client_number
ORDER BY proctime()
MEASURES
B.client_number as client_number,
B.address as address
PATTERN (A* B)
DEFINE
B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
) as T;

I get the following error:
SQL validation failed. Index 0 out of bounds for length 0

Thanks a lot for your help!

Laurent.


>
>
> Best,
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-20405
>
> On Fri, 27 Nov 2020 at 03:28, Leonard Xu  wrote:
>
>> Hi, Laurent
>>
>> Basically, I need to deduplicate, *but only keeping in the deduplication
>> state the latest value of the changed column* to compare with. While
>> here it seems to keep all previous values…
>>
>>
>> You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it
>> will keep last row, I think that’s what you want.
>>
>> BTW, the deduplication has supported event time in 1.12, this will be
>> available soon.
>>
>> Best,
>> Leonard
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
> Rue Emile Francqui, 4
> 1435 Mont-Saint-Guibert
> (T) +32 10 75 02 00
>
>
> *euranova.eu *
> *research.euranova.eu* 
>
> ♻ Be green, keep it on the screen
>
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu *

*research.euranova.eu* 

-- 
♻ Be green, keep it on the screen


Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Konstantin Knauf
Hi Laurent,

With respect to Ververica Platform, we will support Flink 1.12 and add
"upsert-kafka" as a packaged connector in our next minor release which we
target for February.

Cheers,

Konstantin

On Thu, Nov 12, 2020 at 3:43 AM Jark Wu  wrote:

> Hi Laurent,
>
> 1. Deduplicate with keeping the first row will generate an append-only
> stream. But I guess you are expecting to keep the last row which generates
> an updating stream. An alternative way is you can
>  use the "changelog-json" format in this repo [1], it will convert the
> updating stream into append
> records with change flag encoded.
> 2. Yes. It will replace records with the same key, i.e. upsert statement.
> 3. I think it will be available in one or two months. There will be a
> first release candidate soon.
> You can watch on the dev ML. I'm not sure the plan of Ververica
> platform, cc @Konstantin Knauf 
>
> Best,
> Jark
>
> [1]:
> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
>
> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Hi Jark,
>>
>> thanks for your quick reply. I was indeed expecting it.
>>
>> But that triggers the following questions:
>>
>>1. Is there another way to do this deduplication and generate an
>>append-only stream? Match Recognize? UDF? ...?
>>2. If I would put Postgres as a sink, what would happen? Will the
>>events happen or will they replace the record with the same key?
>>3. When will release-1.12 be available? And when would it be
>>integrated in the Ververica platform?
>>
>> Thanks a lot for your help!
>>
>> Best Regards,
>>
>> Laurent.
>>
>>
>>
>> On Wed, 11 Nov 2020 at 03:31, Jark Wu  wrote:
>>
>>> Hi Laurent,
>>>
>>> This is because the deduplicate node generates an updating stream,
>>> however Kafka currently only supports append-only stream.
>>> This can be addressed in release-1.12, because we introduce a new
>>> connector "upsert-kafka" which supports writing updating
>>>  streams into Kafka compacted topics.
>>>
>>> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
>>> ConsumerRecord#timestamp()?
>>> If yes, this is also supported in release-1.12 via metadata syntax in
>>> DDL [1]:
>>>
>>> CREATE TABLE kafka_table (
>>>   id BIGINT,
>>>   name STRING,
>>>   timestamp BIGINT METADATA,  -- read timestamp
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'topic' = 'test-topic',
>>>   'format' = 'avro'
>>> )
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>>
>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>>> laurent.exste...@euranova.eu> wrote:
>>>
 Hello,

 I'm getting an error  in Flink SQL when reading from kafka,
 deduplicating records and sending them back to Kafka.

 The behavior I want is the following:

 *input:*
 | client_number | address |
 | --- | --- |
 | 1  | addr1 |
 | 1  | addr1 |
 | 1  | addr2 |
 | 1  | addr2 |
 | 1  | addr1 |
 | 1  | addr1 |

 *output:*
 | client_number | address |
 | --- | --- |
 | 1  | addr1 |
 | 1  | addr2 |
 | 1  | addr1 |

 The error seems to say that the type of stream created by the
 deduplication query is of "update & delete" type, while kafka only supports
 append-only:

 Unsupported query
 Table sink 'vvp.default.sat_customers_address' doesn't support
 consuming update and delete changes which is produced by node
 Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
 rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
 $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])


 --> Is there a way to create an append only query from this kind of
 deduplication query (see my code here below)?
 --> Would that work if I would use, say, a Postgres sink?

 Bonus question: can we extract the Kafka ingestion date using Flink
 SQL? (here I generated a processing date to allow ordering during
 deduplication)

 P.S.: I'm on the Ververica Platform, but I guess this error is linked
 to Flink SQL itself.

 Thanks in advance for your help.

 Best Regards,

 Laurent.

 ---
 -- Read from customers kafka topic
 ---
 CREATE TEMPORARY TABLE customers (
 `client_number` INT,
 `name` VARCHAR(100),
 `address` VARCHAR(100)
 )
 COMMENT ''
 WITH (
 'connector' = 'kafka',
 'format' = 'csv',
 'properties.bootstrap.servers' = 

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Leonard Xu
Hi, Laurent
> 
> I'm not sure that would do what I want though. As far as I understand, the 
> deduplication query will always remember any values it has seen. So if I 
> have, for a specific primary key, the following values in another field: "a", 
> "a", "b", "b", "a", "a", the deduplication query could provide me with "a", 
> "b" as a result. But never with "a", "b", "a" (possibility to come back to a 
> previous value), which is what I need.

From  my understanding, your case is not a pure deduplication case but want to 
both keep the previous record and current record, thus the deduplication query 
can not satisfy your requirement.


> Moreover, I tried putting procttime() DESC, and I get the message: The 
> submitted query is not an append-only query. Only queries producing 
> exclusively new rows over time are supported at the moment. I do want an 
> append only query.
Keeping last row in Deduplication always produces a changelog stream, because 
we need to retract the previous last value and sent the new last value. You 
could use a connector that supports upsert sink like HBase, JDBC or 
upsert-kafka connector when sink a changelog stream, the kafka connector can 
only accept append-only stream and thus you got the message.


> The LAG function makes complete sense to me here, since it would only compare 
> with the previous record. I just don't understand why it does not get the 
> value of the previous record, whatever offset I give it. Any idea what I 
> might be doing wrong?

The LAG function is used in over window aggregation and should work in your 
case, but unfortunately look like the LAG function does not implements 
correctly, I create an issue[1] to fix this.

 
 
Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-20405

> On Fri, 27 Nov 2020 at 03:28, Leonard Xu  > wrote:
> Hi, Laurent
> 
>> Basically, I need to deduplicate, but only keeping in the deduplication 
>> state the latest value of the changed column to compare with. While here it 
>> seems to keep all previous values…
> 
> You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it will 
> keep last row, I think that’s what you want.
> 
> BTW, the deduplication has supported event time in 1.12, this will be 
> available soon.
> 
> Best,
> Leonard
> 
> 
> 
> -- 
> Laurent Exsteens
> Data Engineer
> (M) +32 (0) 486 20 48 36
> 
> EURA NOVA
> Rue Emile Francqui, 4
> 1435 Mont-Saint-Guibert
> (T) +32 10 75 02 00 
> 
> euranova.eu 
> research.euranova.eu 
> ♻ Be green, keep it on the screen



Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard,

thank you for your answer.

I'm not sure that would do what I want though. As far as I understand, the
deduplication query will always remember any values it has seen. So if I
have, for a specific primary key, the following values in another field:
"a", "a", "b", "b", "a", "a", the deduplication query could provide me with
"a", "b" as a result. But never with "a", "b", "a" (possibility to come
back to a previous value), which is what I need.
Moreover, I tried putting procttime() DESC, and I get the message: The
submitted query is not an append-only query. Only queries producing
exclusively new rows over time are supported at the moment. I do want an
append only query.

The LAG function makes complete sense to me here, since it would only
compare with the previous record. I just don't understand why it does not
get the value of the previous record, whatever offset I give it. Any idea
what I might be doing wrong?

Thanks in advance.

Regards,

Laurent.

On Fri, 27 Nov 2020 at 03:28, Leonard Xu  wrote:

> Hi, Laurent
>
> Basically, I need to deduplicate, *but only keeping in the deduplication
> state the latest value of the changed column* to compare with. While here
> it seems to keep all previous values…
>
>
> You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it
> will keep last row, I think that’s what you want.
>
> BTW, the deduplication has supported event time in 1.12, this will be
> available soon.
>
> Best,
> Leonard
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu *

*research.euranova.eu* 

-- 
♻ Be green, keep it on the screen


Re: FlinkSQL kafka->dedup->kafka

2020-11-26 Thread Leonard Xu
Hi, Laurent

> Basically, I need to deduplicate, but only keeping in the deduplication state 
> the latest value of the changed column to compare with. While here it seems 
> to keep all previous values…

You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it will 
keep last row, I think that’s what you want.

BTW, the deduplication has supported event time in 1.12, this will be available 
soon.

Best,
Leonard



Re: FlinkSQL kafka->dedup->kafka

2020-11-26 Thread Laurent Exsteens
Hello,

seems like LAG would probably be the right function to use.

However, I get unexpected results from it:

---
-- Deduplicate addresses
---
--CREATE TEMPORARY VIEW dedup_address as
SELECT *
FROM (
SELECT client_number
, address
, LAG(address) OVER (PARTITION BY client_number ORDER BY proctime() ASC) AS
previous_address
FROM customers
);

gives me the same value for LAG(address) than for address:

client_numberaddressprevious_address
cli1 addr1 addr1
cli1 addr1 addr1
cli1 addr2 addr2
cli1 addr2 addr2
cli1 addr1 addr1
cli1 addr3 addr3
cli1 addr4 addr4
cli1 addr5 addr5
cli1 add5 add5

What is even more strange is that I get the same result no matter what
offset I ask:

   - 1 (default)
   - 2
   - 0 (does it even make sense? how is it indexed?)
   - -1 (that definitely doesn't seem to make sense to me...)


What am I doing wrong here?

Thanks in advance for your help.

Regards,

Laurent.


On Thu, 12 Nov 2020 at 21:56, Laurent Exsteens 
wrote:

> I'm now trying with a MATCH_RECOGNIZE:
>
>
> SELECT *
> FROM customers
> MATCH_RECOGNIZE (
> PARTITION BY client_number
> ORDER BY proctime()
> MEASURES
> LAST(B.client_number) as client_number,
> LAST(B.address) as address
> PATTERN (A* B)
> DEFINE
> B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
> ) as T;
>
>
> However, I get the following error:
> SQL validation failed. Index 0 out of bounds for length 0
>
> I've read the documentation and tried different formulations, but I don't
> manage to make this work.
> However, it should be possible to express what I need since the examples
> of the documentation allos for way more complex patterns.
> What am I doing wrong?
>
> I still prefer sth simpler such as the deduplication query (if it could
> work as I need it), as it would be a way harder sell to propose FlinkSQL to
> lower the expertise required for our jobs, if a "simple" deduplication
> already needs a complex query involving CEP
>
> Thanks in advance for your help!
>
> Best Regards,
>
> Laurent.
>
>
>
> On Thu, 12 Nov 2020 at 17:22, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> I see what was my mistake: I was using a field in my ORDER BY, while it
>> only support proctime() for now.
>>
>> That allows me to create an append only stream, thanks a lot!
>>
>> However, it still does not allow me to do what I need:
>>
>>
>> *If I use both my primary key and changing column in PARTITION BY, then
>> it does not allow me to come back to a previous value of my changed column:*
>> SELECT client_number
>> , address
>> , proctime() as load_date
>> FROM (
>> SELECT client_number
>> , address
>> , ROW_NUMBER() OVER (PARTITION BY *client_number, **address* ORDER BY
>> proctime() ASC) AS rownum
>> FROM customers
>> ) where rownum = 1;
>> *input:*
>> *output*:
>> | client_number | address |  load_date |   |
>> client_number | address |  load_date |
>> | --- | --- | -- |  |
>> --- | --- | -- |
>> | 1  | addr1 | ts1 | -->|
>> 1  | addr1 | ts1 |
>> | 1  | addr1 | ts2 |
>> | 1  | addr2 | ts3 | -->
>> | 1  | addr2 | ts3 |
>> | 1  | addr2 | ts4 |
>> | 1  | addr1 | ts5 |
>> *-->| 1  | addr1 | ts5 | <--
>> this one does not show --> I cannot change back to a previous value :(*
>> | 1  | addr1 | ts6 |
>>
>>
>>
>>
>>
>> *If however I only put my primary key in PARTITION BY, then I only get
>> the first value of my changed column:*
>> SELECT client_number
>> , address
>> , proctime() as load_date
>> FROM (
>> SELECT client_number
>> , address
>> , ROW_NUMBER() OVER (PARTITION BY *client_number* ORDER BY proctime() ASC)
>> AS rownum
>> FROM customers
>> ) where rownum = 1;
>> *input:*
>> *output*:
>> | client_number | address |  load_date |   |
>> client_number | address |  load_date |
>> | --- | --- | -- |  |
>> --- | --- | -- |
>> | 1  | addr1 | ts1 | -->|
>> 1  | addr1 | ts1 |
>> | 1  | addr1 | ts2 |
>> | 1  | addr2 | ts3 |
>> *-->| 1  | addr2 | ts3 |
>> <-- this one does not show :(*
>> | 1  | addr2 | ts4 |
>> | 1  | addr1 | ts5 |
>> *-->| 1  | addr1 | ts5 | <--
>> this one 

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
I'm now trying with a MATCH_RECOGNIZE:


SELECT *
FROM customers
MATCH_RECOGNIZE (
PARTITION BY client_number
ORDER BY proctime()
MEASURES
LAST(B.client_number) as client_number,
LAST(B.address) as address
PATTERN (A* B)
DEFINE
B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
) as T;


However, I get the following error:
SQL validation failed. Index 0 out of bounds for length 0

I've read the documentation and tried different formulations, but I don't
manage to make this work.
However, it should be possible to express what I need since the examples of
the documentation allos for way more complex patterns.
What am I doing wrong?

I still prefer sth simpler such as the deduplication query (if it could
work as I need it), as it would be a way harder sell to propose FlinkSQL to
lower the expertise required for our jobs, if a "simple" deduplication
already needs a complex query involving CEP

Thanks in advance for your help!

Best Regards,

Laurent.



On Thu, 12 Nov 2020 at 17:22, Laurent Exsteens 
wrote:

> I see what was my mistake: I was using a field in my ORDER BY, while it
> only support proctime() for now.
>
> That allows me to create an append only stream, thanks a lot!
>
> However, it still does not allow me to do what I need:
>
>
> *If I use both my primary key and changing column in PARTITION BY, then it
> does not allow me to come back to a previous value of my changed column:*
> SELECT client_number
> , address
> , proctime() as load_date
> FROM (
> SELECT client_number
> , address
> , ROW_NUMBER() OVER (PARTITION BY *client_number, **address* ORDER BY
> proctime() ASC) AS rownum
> FROM customers
> ) where rownum = 1;
> *input:*
> *output*:
> | client_number | address |  load_date |   | client_number
> | address |  load_date |
> | --- | --- | -- |  |
> --- | --- | -- |
> | 1  | addr1 | ts1 | -->|
> 1  | addr1 | ts1 |
> | 1  | addr1 | ts2 |
> | 1  | addr2 | ts3 | -->|
> 1  | addr2 | ts3 |
> | 1  | addr2 | ts4 |
> | 1  | addr1 | ts5 |
> *-->| 1  | addr1 | ts5 | <--
> this one does not show --> I cannot change back to a previous value :(*
> | 1  | addr1 | ts6 |
>
>
>
>
>
> *If however I only put my primary key in PARTITION BY, then I only get the
> first value of my changed column:*
> SELECT client_number
> , address
> , proctime() as load_date
> FROM (
> SELECT client_number
> , address
> , ROW_NUMBER() OVER (PARTITION BY *client_number* ORDER BY proctime() ASC)
> AS rownum
> FROM customers
> ) where rownum = 1;
> *input:*
> *output*:
> | client_number | address |  load_date |   | client_number
> | address |  load_date |
> | --- | --- | -- |  |
> --- | --- | -- |
> | 1  | addr1 | ts1 | -->|
> 1  | addr1 | ts1 |
> | 1  | addr1 | ts2 |
> | 1  | addr2 | ts3 |
> *-->| 1  | addr2 | ts3 |
> <-- this one does not show :(*
> | 1  | addr2 | ts4 |
> | 1  | addr1 | ts5 |
> *-->| 1  | addr1 | ts5 | <--
> this one does not show :(*
> | 1  | addr1 | ts6 |
>
>
>
> Basically, I need to deduplicate, *but only keeping in the deduplication
> state the latest value of the changed column* to compare with. While here
> it seems to keep all previous values...
>
> Is there a way to obtain the behavior I need (with this deduplication
> method or another one)?
>
> Thanks in advance.
>
> Best Regards,
>
> Laurent.
>
>
> On Thu, 12 Nov 2020 at 12:48, Jark Wu  wrote:
>
>> Hi Laurent,
>>
>> 1. Currently, it's impossible to convert deduplicate with last row into
>> an append-only stream.
>> 2. Yes, I think Ververica platform doesn't support 'changelog-json'
>> format natively.
>>
>> However, regarding your case, I think you can use keep first row on
>> client_number+address key.
>>
>> SELECT *
>> FROM (
>>SELECT client_number, address, load_date
>>  ROW_NUMBER() OVER
>>(PARTITION BY client_number, address ORDER BY proctime() ASC) AS
>> rownum
>>FROM src)
>> WHERE rownum = 1
>>
>> That means, the duplicate records on the same client_number + address
>> will be ignored,
>> but the new value of address will be emitted as an append-only stream.
>>
>> 

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
I see what was my mistake: I was using a field in my ORDER BY, while it
only support proctime() for now.

That allows me to create an append only stream, thanks a lot!

However, it still does not allow me to do what I need:


*If I use both my primary key and changing column in PARTITION BY, then it
does not allow me to come back to a previous value of my changed column:*
SELECT client_number
, address
, proctime() as load_date
FROM (
SELECT client_number
, address
, ROW_NUMBER() OVER (PARTITION BY *client_number, **address* ORDER BY
proctime() ASC) AS rownum
FROM customers
) where rownum = 1;
*input:*
*output*:
| client_number | address |  load_date |   | client_number
| address |  load_date |
| --- | --- | -- |  |
--- | --- | -- |
| 1  | addr1 | ts1 | -->|
1  | addr1 | ts1 |
| 1  | addr1 | ts2 |
| 1  | addr2 | ts3 | -->|
1  | addr2 | ts3 |
| 1  | addr2 | ts4 |
| 1  | addr1 | ts5 |
*-->| 1  | addr1 | ts5 | <--
this one does not show --> I cannot change back to a previous value :(*
| 1  | addr1 | ts6 |





*If however I only put my primary key in PARTITION BY, then I only get the
first value of my changed column:*
SELECT client_number
, address
, proctime() as load_date
FROM (
SELECT client_number
, address
, ROW_NUMBER() OVER (PARTITION BY *client_number* ORDER BY proctime() ASC)
AS rownum
FROM customers
) where rownum = 1;
*input:*
*output*:
| client_number | address |  load_date |   | client_number
| address |  load_date |
| --- | --- | -- |  |
--- | --- | -- |
| 1  | addr1 | ts1 | -->|
1  | addr1 | ts1 |
| 1  | addr1 | ts2 |
| 1  | addr2 | ts3 |
*-->| 1  | addr2 | ts3 |
<-- this one does not show :(*
| 1  | addr2 | ts4 |
| 1  | addr1 | ts5 |
*-->| 1  | addr1 | ts5 | <--
this one does not show :(*
| 1  | addr1 | ts6 |



Basically, I need to deduplicate, *but only keeping in the deduplication
state the latest value of the changed column* to compare with. While here
it seems to keep all previous values...

Is there a way to obtain the behavior I need (with this deduplication
method or another one)?

Thanks in advance.

Best Regards,

Laurent.


On Thu, 12 Nov 2020 at 12:48, Jark Wu  wrote:

> Hi Laurent,
>
> 1. Currently, it's impossible to convert deduplicate with last row into an
> append-only stream.
> 2. Yes, I think Ververica platform doesn't support 'changelog-json' format
> natively.
>
> However, regarding your case, I think you can use keep first row on
> client_number+address key.
>
> SELECT *
> FROM (
>SELECT client_number, address, load_date
>  ROW_NUMBER() OVER
>(PARTITION BY client_number, address ORDER BY proctime() ASC) AS
> rownum
>FROM src)
> WHERE rownum = 1
>
> That means, the duplicate records on the same client_number + address will
> be ignored,
> but the new value of address will be emitted as an append-only stream.
>
> Hope this helps you.
>
> Best,
> Jark
>
>
> On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Hi Jark,
>>
>> thanks again for your quick response!
>>
>> I tried multiple variants of my query by:
>> - specifying only the primary key in the PARTITION BY clause
>> - changing the order to DESC to keep the last row
>>
>> --> I unfortunately always get the same error message.
>> If I try to make a simple select on the result of this query, I also get
>> the following error: The submitted query is not an append-only query.
>> Only queries producing exclusively new rows over time are supported at the
>> moment. So whatever change I make, I never get an append-only query -->
>> Is there something I missed?
>>
>> I also tried to write to kafka as changelog-json, but I got the answer: The
>> sink connector for table `vvp`.`default`.`sat_customers_address` could not
>> be created. 'changelog-json' is not a supported sink format. Supported sink
>> formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet'].
>> (maybe because I'm on the Ververica platform?)
>> This also seem to require an extra kafka topic then, so not ideal.
>>
>>
>> *I'm starting to wonder if 

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Jark Wu
Hi Laurent,

1. Currently, it's impossible to convert deduplicate with last row into an
append-only stream.
2. Yes, I think Ververica platform doesn't support 'changelog-json' format
natively.

However, regarding your case, I think you can use keep first row on
client_number+address key.

SELECT *
FROM (
   SELECT client_number, address, load_date
 ROW_NUMBER() OVER
   (PARTITION BY client_number, address ORDER BY proctime() ASC) AS
rownum
   FROM src)
WHERE rownum = 1

That means, the duplicate records on the same client_number + address will
be ignored,
but the new value of address will be emitted as an append-only stream.

Hope this helps you.

Best,
Jark


On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens 
wrote:

> Hi Jark,
>
> thanks again for your quick response!
>
> I tried multiple variants of my query by:
> - specifying only the primary key in the PARTITION BY clause
> - changing the order to DESC to keep the last row
>
> --> I unfortunately always get the same error message.
> If I try to make a simple select on the result of this query, I also get
> the following error: The submitted query is not an append-only query.
> Only queries producing exclusively new rows over time are supported at the
> moment. So whatever change I make, I never get an append-only query -->
> Is there something I missed?
>
> I also tried to write to kafka as changelog-json, but I got the answer: The
> sink connector for table `vvp`.`default`.`sat_customers_address` could not
> be created. 'changelog-json' is not a supported sink format. Supported sink
> formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet'].
> (maybe because I'm on the Ververica platform?)
> This also seem to require an extra kafka topic then, so not ideal.
>
>
> *I'm starting to wonder if the deduplication query is really what I need.*
>
> What I need is:
> - to forward only the records where some columns (ideally configurable)
> change for a specific primary key.
> - in realtime (no windowing)
> - and have as a result an append-only stream.
>
> Like this:
>
> *input:*
> *output* (this is what should ultimatelly be
> published to Kafka and later inserted in a RDBMS):
> | client_number | address |  load_date |   | client_number
> | address |  load_date |
> | --- | --- | -- |  |
> --- | --- | -- |
> | 1  | addr1 | ts1 | -->|
> 1  | addr1 | ts1 |
> | 1  | addr1 | ts2 |
> | 1  | addr2 | ts3 | -->|
> 1  | addr2 | ts3 |
> | 1  | addr2 | ts4 |
> | 1  | addr1 | ts5 | -->|
> 1  | addr1 | ts5 |
> | 1  | addr1 | ts6 |
>
>
> --> is this deduplication query the right fit therefore?
>  - if yes, how should it be written to generate an append-only stream?
>  - If not, are there other options? (Match Recognize, UDF, ?)
>
> Thanks a lot for your much appreciated help :).
>
> Best Regards,
>
> Laurent.
>
>
> On Thu, 12 Nov 2020 at 07:26, Jark Wu  wrote:
>
>> Hi Laurent,
>>
>> > What I want is a record to be forwarded only if some of the columns
>> change
>>
>> IIUC, what you want is still deduplication with the last row.
>> Keeping first row will drop all the duplicate rows on the same primary
>> key.
>> Keeping last row will emit updates when the duplicate rows on the same
>> primary key, that means column value changes will notify downstream
>> operators.
>> The difference of keeping first row and last row is specified by the
>> direction of ORDER BY clause [1].
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>
>>
>>
>>
>> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens <
>> laurent.exste...@euranova.eu> wrote:
>>
>>> Thanks.
>>>
>>> I actually want the first row. What I want is a record to be forwarded
>>> only if some of the columns change (of course keyed by the primary key). I
>>> used rownum = 1, is that not selecting the first row?
>>>
>>> How can I adapt my query to let only the row effectively changing the
>>> values pass, as an append only stream?
>>>
>>> If not possible, I'll look at converting it after. But I prefer a
>>> solution in the deduplication query.
>>> The goal is to show my customer that what they want to achieve is very
>>> straightforward in flink SQL, so the simpler the queries the better. I need
>>> to present my conclusions tomorrow.
>>>
>>> Thanks a lot already for your help!
>>>
>>> Best regards,
>>>
>>> Laurent.
>>>
>>>
>>> On Thu, Nov 12, 2020, 03:43 Jark Wu  wrote:
>>>
 Hi Laurent,

 1. Deduplicate with keeping the first row will 

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
Hi Jark,

thanks again for your quick response!

I tried multiple variants of my query by:
- specifying only the primary key in the PARTITION BY clause
- changing the order to DESC to keep the last row

--> I unfortunately always get the same error message.
If I try to make a simple select on the result of this query, I also get
the following error: The submitted query is not an append-only query. Only
queries producing exclusively new rows over time are supported at the moment.
So whatever change I make, I never get an append-only query --> Is there
something I missed?

I also tried to write to kafka as changelog-json, but I got the answer: The
sink connector for table `vvp`.`default`.`sat_customers_address` could not
be created. 'changelog-json' is not a supported sink format. Supported sink
formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet'].
(maybe because I'm on the Ververica platform?)
This also seem to require an extra kafka topic then, so not ideal.


*I'm starting to wonder if the deduplication query is really what I need.*

What I need is:
- to forward only the records where some columns (ideally configurable)
change for a specific primary key.
- in realtime (no windowing)
- and have as a result an append-only stream.

Like this:

*input:*
*output* (this is what should ultimatelly be published
to Kafka and later inserted in a RDBMS):
| client_number | address |  load_date |   | client_number
| address |  load_date |
| --- | --- | -- |  |
--- | --- | -- |
| 1  | addr1 | ts1 | -->|
1  | addr1 | ts1 |
| 1  | addr1 | ts2 |
| 1  | addr2 | ts3 | -->|
1  | addr2 | ts3 |
| 1  | addr2 | ts4 |
| 1  | addr1 | ts5 | -->|
1  | addr1 | ts5 |
| 1  | addr1 | ts6 |


--> is this deduplication query the right fit therefore?
 - if yes, how should it be written to generate an append-only stream?
 - If not, are there other options? (Match Recognize, UDF, ?)

Thanks a lot for your much appreciated help :).

Best Regards,

Laurent.


On Thu, 12 Nov 2020 at 07:26, Jark Wu  wrote:

> Hi Laurent,
>
> > What I want is a record to be forwarded only if some of the columns
> change
>
> IIUC, what you want is still deduplication with the last row.
> Keeping first row will drop all the duplicate rows on the same primary
> key.
> Keeping last row will emit updates when the duplicate rows on the same
> primary key, that means column value changes will notify downstream
> operators.
> The difference of keeping first row and last row is specified by the
> direction of ORDER BY clause [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>
>
>
>
> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Thanks.
>>
>> I actually want the first row. What I want is a record to be forwarded
>> only if some of the columns change (of course keyed by the primary key). I
>> used rownum = 1, is that not selecting the first row?
>>
>> How can I adapt my query to let only the row effectively changing the
>> values pass, as an append only stream?
>>
>> If not possible, I'll look at converting it after. But I prefer a
>> solution in the deduplication query.
>> The goal is to show my customer that what they want to achieve is very
>> straightforward in flink SQL, so the simpler the queries the better. I need
>> to present my conclusions tomorrow.
>>
>> Thanks a lot already for your help!
>>
>> Best regards,
>>
>> Laurent.
>>
>>
>> On Thu, Nov 12, 2020, 03:43 Jark Wu  wrote:
>>
>>> Hi Laurent,
>>>
>>> 1. Deduplicate with keeping the first row will generate an append-only
>>> stream. But I guess you are expecting to keep the last row which generates
>>> an updating stream. An alternative way is you can
>>>  use the "changelog-json" format in this repo [1], it will convert the
>>> updating stream into append
>>> records with change flag encoded.
>>> 2. Yes. It will replace records with the same key, i.e. upsert
>>> statement.
>>> 3. I think it will be available in one or two months. There will be a
>>> first release candidate soon.
>>> You can watch on the dev ML. I'm not sure the plan of Ververica
>>> platform, cc @Konstantin Knauf 
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
>>>
>>> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <
>>> laurent.exste...@euranova.eu> wrote:
>>>
 Hi Jark,

 thanks for your quick reply. I was indeed expecting it.

Re: FlinkSQL kafka->dedup->kafka

2020-11-11 Thread Jark Wu
Hi Laurent,

1. Deduplicate with keeping the first row will generate an append-only
stream. But I guess you are expecting to keep the last row which generates
an updating stream. An alternative way is you can
 use the "changelog-json" format in this repo [1], it will convert the
updating stream into append
records with change flag encoded.
2. Yes. It will replace records with the same key, i.e. upsert statement.
3. I think it will be available in one or two months. There will be a first
release candidate soon.
You can watch on the dev ML. I'm not sure the plan of Ververica
platform, cc @Konstantin Knauf 

Best,
Jark

[1]:
https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens 
wrote:

> Hi Jark,
>
> thanks for your quick reply. I was indeed expecting it.
>
> But that triggers the following questions:
>
>1. Is there another way to do this deduplication and generate an
>append-only stream? Match Recognize? UDF? ...?
>2. If I would put Postgres as a sink, what would happen? Will the
>events happen or will they replace the record with the same key?
>3. When will release-1.12 be available? And when would it be
>integrated in the Ververica platform?
>
> Thanks a lot for your help!
>
> Best Regards,
>
> Laurent.
>
>
>
> On Wed, 11 Nov 2020 at 03:31, Jark Wu  wrote:
>
>> Hi Laurent,
>>
>> This is because the deduplicate node generates an updating stream,
>> however Kafka currently only supports append-only stream.
>> This can be addressed in release-1.12, because we introduce a new
>> connector "upsert-kafka" which supports writing updating
>>  streams into Kafka compacted topics.
>>
>> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
>> ConsumerRecord#timestamp()?
>> If yes, this is also supported in release-1.12 via metadata syntax in DDL
>> [1]:
>>
>> CREATE TABLE kafka_table (
>>   id BIGINT,
>>   name STRING,
>>   timestamp BIGINT METADATA,  -- read timestamp
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'test-topic',
>>   'format' = 'avro'
>> )
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>
>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>> laurent.exste...@euranova.eu> wrote:
>>
>>> Hello,
>>>
>>> I'm getting an error  in Flink SQL when reading from kafka,
>>> deduplicating records and sending them back to Kafka.
>>>
>>> The behavior I want is the following:
>>>
>>> *input:*
>>> | client_number | address |
>>> | --- | --- |
>>> | 1  | addr1 |
>>> | 1  | addr1 |
>>> | 1  | addr2 |
>>> | 1  | addr2 |
>>> | 1  | addr1 |
>>> | 1  | addr1 |
>>>
>>> *output:*
>>> | client_number | address |
>>> | --- | --- |
>>> | 1  | addr1 |
>>> | 1  | addr2 |
>>> | 1  | addr1 |
>>>
>>> The error seems to say that the type of stream created by the
>>> deduplication query is of "update & delete" type, while kafka only supports
>>> append-only:
>>>
>>> Unsupported query
>>> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
>>> update and delete changes which is produced by node
>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>>
>>>
>>> --> Is there a way to create an append only query from this kind of
>>> deduplication query (see my code here below)?
>>> --> Would that work if I would use, say, a Postgres sink?
>>>
>>> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
>>> (here I generated a processing date to allow ordering during deduplication)
>>>
>>> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
>>> Flink SQL itself.
>>>
>>> Thanks in advance for your help.
>>>
>>> Best Regards,
>>>
>>> Laurent.
>>>
>>> ---
>>> -- Read from customers kafka topic
>>> ---
>>> CREATE TEMPORARY TABLE customers (
>>> `client_number` INT,
>>> `name` VARCHAR(100),
>>> `address` VARCHAR(100)
>>> )
>>> COMMENT ''
>>> WITH (
>>> 'connector' = 'kafka',
>>> 'format' = 'csv',
>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>>> 'properties.group.id' = 'flinkSQL',
>>> 'topic' = 'customers',
>>> 'csv.field-delimiter' = ';',
>>> 'scan.startup.mode' = 'earliest-offset'
>>> );
>>>
>>>
>>>
>>> ---
>>> -- Add metadata
>>> ---
>>> CREATE TEMPORARY VIEW metadata AS
>>> SELECT *
>>> , sha256(cast(client_number as STRING)) AS customer_pk
>>> , current_timestamp AS load_date
>>> , 'Kafka topic: customers' 

Re: FlinkSQL kafka->dedup->kafka

2020-11-11 Thread Laurent Exsteens
Hi Jark,

thanks for your quick reply. I was indeed expecting it.

But that triggers the following questions:

   1. Is there another way to do this deduplication and generate an
   append-only stream? Match Recognize? UDF? ...?
   2. If I would put Postgres as a sink, what would happen? Will the events
   happen or will they replace the record with the same key?
   3. When will release-1.12 be available? And when would it be integrated
   in the Ververica platform?

Thanks a lot for your help!

Best Regards,

Laurent.



On Wed, 11 Nov 2020 at 03:31, Jark Wu  wrote:

> Hi Laurent,
>
> This is because the deduplicate node generates an updating stream, however
> Kafka currently only supports append-only stream.
> This can be addressed in release-1.12, because we introduce a new
> connector "upsert-kafka" which supports writing updating
>  streams into Kafka compacted topics.
>
> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
> ConsumerRecord#timestamp()?
> If yes, this is also supported in release-1.12 via metadata syntax in DDL
> [1]:
>
> CREATE TABLE kafka_table (
>   id BIGINT,
>   name STRING,
>   timestamp BIGINT METADATA,  -- read timestamp
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'test-topic',
>   'format' = 'avro'
> )
>
> Best,
> Jark
>
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>
> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Hello,
>>
>> I'm getting an error  in Flink SQL when reading from kafka, deduplicating
>> records and sending them back to Kafka.
>>
>> The behavior I want is the following:
>>
>> *input:*
>> | client_number | address |
>> | --- | --- |
>> | 1  | addr1 |
>> | 1  | addr1 |
>> | 1  | addr2 |
>> | 1  | addr2 |
>> | 1  | addr1 |
>> | 1  | addr1 |
>>
>> *output:*
>> | client_number | address |
>> | --- | --- |
>> | 1  | addr1 |
>> | 1  | addr2 |
>> | 1  | addr1 |
>>
>> The error seems to say that the type of stream created by the
>> deduplication query is of "update & delete" type, while kafka only supports
>> append-only:
>>
>> Unsupported query
>> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
>> update and delete changes which is produced by node
>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>
>>
>> --> Is there a way to create an append only query from this kind of
>> deduplication query (see my code here below)?
>> --> Would that work if I would use, say, a Postgres sink?
>>
>> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
>> (here I generated a processing date to allow ordering during deduplication)
>>
>> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
>> Flink SQL itself.
>>
>> Thanks in advance for your help.
>>
>> Best Regards,
>>
>> Laurent.
>>
>> ---
>> -- Read from customers kafka topic
>> ---
>> CREATE TEMPORARY TABLE customers (
>> `client_number` INT,
>> `name` VARCHAR(100),
>> `address` VARCHAR(100)
>> )
>> COMMENT ''
>> WITH (
>> 'connector' = 'kafka',
>> 'format' = 'csv',
>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>> 'properties.group.id' = 'flinkSQL',
>> 'topic' = 'customers',
>> 'csv.field-delimiter' = ';',
>> 'scan.startup.mode' = 'earliest-offset'
>> );
>>
>>
>>
>> ---
>> -- Add metadata
>> ---
>> CREATE TEMPORARY VIEW metadata AS
>> SELECT *
>> , sha256(cast(client_number as STRING)) AS customer_pk
>> , current_timestamp AS load_date
>> , 'Kafka topic: customers' AS record_source
>> FROM customers;
>>
>>
>>
>> ---
>> -- Deduplicate addresses
>> ---
>> CREATE TEMPORARY VIEW dedup_address as
>> SELECT customer_pk
>> , client_number
>> , load_date
>> , address
>> FROM (
>> SELECT customer_pk
>> , client_number
>> , load_date
>> , record_source
>> , address
>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
>> ORDER BY load_date ASC) AS rownum
>> FROM metadata
>> ) where rownum = 1;
>>
>>
>>
>>
>>
>>
>> ---
>> -- Send to sat_customers_address kafka topic
>> ---
>> CREATE TEMPORARY TABLE sat_customers_address (
>> `customer_pk` VARCHAR(64),
>> `client_number` INT,
>> `address` VARCHAR(100)
>> )
>> COMMENT ''
>> WITH (
>> 'connector' = 'kafka',
>> 'format' = 'csv',
>> 'properties.bootstrap.servers' =
>> 

Re: FlinkSQL kafka->dedup->kafka

2020-11-10 Thread Jark Wu
Hi Laurent,

This is because the deduplicate node generates an updating stream, however
Kafka currently only supports append-only stream.
This can be addressed in release-1.12, because we introduce a new connector
"upsert-kafka" which supports writing updating
 streams into Kafka compacted topics.

Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
ConsumerRecord#timestamp()?
If yes, this is also supported in release-1.12 via metadata syntax in DDL
[1]:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp BIGINT METADATA,  -- read timestamp
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',
  'format' = 'avro'
)

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors

On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens 
wrote:

> Hello,
>
> I'm getting an error  in Flink SQL when reading from kafka, deduplicating
> records and sending them back to Kafka.
>
> The behavior I want is the following:
>
> *input:*
> | client_number | address |
> | --- | --- |
> | 1  | addr1 |
> | 1  | addr1 |
> | 1  | addr2 |
> | 1  | addr2 |
> | 1  | addr1 |
> | 1  | addr1 |
>
> *output:*
> | client_number | address |
> | --- | --- |
> | 1  | addr1 |
> | 1  | addr2 |
> | 1  | addr1 |
>
> The error seems to say that the type of stream created by the
> deduplication query is of "update & delete" type, while kafka only supports
> append-only:
>
> Unsupported query
> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
> update and delete changes which is produced by node
> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>
>
> --> Is there a way to create an append only query from this kind of
> deduplication query (see my code here below)?
> --> Would that work if I would use, say, a Postgres sink?
>
> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
> (here I generated a processing date to allow ordering during deduplication)
>
> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
> Flink SQL itself.
>
> Thanks in advance for your help.
>
> Best Regards,
>
> Laurent.
>
> ---
> -- Read from customers kafka topic
> ---
> CREATE TEMPORARY TABLE customers (
> `client_number` INT,
> `name` VARCHAR(100),
> `address` VARCHAR(100)
> )
> COMMENT ''
> WITH (
> 'connector' = 'kafka',
> 'format' = 'csv',
> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
> 'properties.group.id' = 'flinkSQL',
> 'topic' = 'customers',
> 'csv.field-delimiter' = ';',
> 'scan.startup.mode' = 'earliest-offset'
> );
>
>
>
> ---
> -- Add metadata
> ---
> CREATE TEMPORARY VIEW metadata AS
> SELECT *
> , sha256(cast(client_number as STRING)) AS customer_pk
> , current_timestamp AS load_date
> , 'Kafka topic: customers' AS record_source
> FROM customers;
>
>
>
> ---
> -- Deduplicate addresses
> ---
> CREATE TEMPORARY VIEW dedup_address as
> SELECT customer_pk
> , client_number
> , load_date
> , address
> FROM (
> SELECT customer_pk
> , client_number
> , load_date
> , record_source
> , address
> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
> ORDER BY load_date ASC) AS rownum
> FROM metadata
> ) where rownum = 1;
>
>
>
>
>
>
> ---
> -- Send to sat_customers_address kafka topic
> ---
> CREATE TEMPORARY TABLE sat_customers_address (
> `customer_pk` VARCHAR(64),
> `client_number` INT,
> `address` VARCHAR(100)
> )
> COMMENT ''
> WITH (
> 'connector' = 'kafka',
> 'format' = 'csv',
> 'properties.bootstrap.servers' =
> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
> 'properties.group.id' = 'flinkSQL',
> 'topic' = 'sat_customers_address'
> );
>
> INSERT INTO sat_customers_address
> SELECT customer_pk
> , client_number
> , address
> FROM dedup_address;
>
>
>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu *
>
> *research.euranova.eu* 
>
> ♻ Be green, keep it on the screen


FlinkSQL kafka->dedup->kafka

2020-11-10 Thread Laurent Exsteens
Hello,

I'm getting an error  in Flink SQL when reading from kafka, deduplicating
records and sending them back to Kafka.

The behavior I want is the following:

*input:*
| client_number | address |
| --- | --- |
| 1  | addr1 |
| 1  | addr1 |
| 1  | addr2 |
| 1  | addr2 |
| 1  | addr1 |
| 1  | addr1 |

*output:*
| client_number | address |
| --- | --- |
| 1  | addr1 |
| 1  | addr2 |
| 1  | addr1 |

The error seems to say that the type of stream created by the deduplication
query is of "update & delete" type, while kafka only supports append-only:

Unsupported query
Table sink 'vvp.default.sat_customers_address' doesn't support consuming
update and delete changes which is produced by node
Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
$2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])


--> Is there a way to create an append only query from this kind of
deduplication query (see my code here below)?
--> Would that work if I would use, say, a Postgres sink?

Bonus question: can we extract the Kafka ingestion date using Flink SQL?
(here I generated a processing date to allow ordering during deduplication)

P.S.: I'm on the Ververica Platform, but I guess this error is linked to
Flink SQL itself.

Thanks in advance for your help.

Best Regards,

Laurent.

---
-- Read from customers kafka topic
---
CREATE TEMPORARY TABLE customers (
`client_number` INT,
`name` VARCHAR(100),
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'customers',
'csv.field-delimiter' = ';',
'scan.startup.mode' = 'earliest-offset'
);



---
-- Add metadata
---
CREATE TEMPORARY VIEW metadata AS
SELECT *
, sha256(cast(client_number as STRING)) AS customer_pk
, current_timestamp AS load_date
, 'Kafka topic: customers' AS record_source
FROM customers;



---
-- Deduplicate addresses
---
CREATE TEMPORARY VIEW dedup_address as
SELECT customer_pk
, client_number
, load_date
, address
FROM (
SELECT customer_pk
, client_number
, load_date
, record_source
, address
, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER
BY load_date ASC) AS rownum
FROM metadata
) where rownum = 1;






---
-- Send to sat_customers_address kafka topic
---
CREATE TEMPORARY TABLE sat_customers_address (
`customer_pk` VARCHAR(64),
`client_number` INT,
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' =
'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'sat_customers_address'
);

INSERT INTO sat_customers_address
SELECT customer_pk
, client_number
, address
FROM dedup_address;




-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu *

*research.euranova.eu* 

-- 
♻ Be green, keep it on the screen