Hi Jark,

thanks for your quick reply. I was indeed expecting it.

But that triggers the following questions:

   1. Is there another way to do this deduplication and generate an
   append-only stream? Match Recognize? UDF? ...?
   2. If I would put Postgres as a sink, what would happen? Will the events
   happen or will they replace the record with the same key?
   3. When will release-1.12 be available? And when would it be integrated
   in the Ververica platform?

Thanks a lot for your help!

Best Regards,

Laurent.



On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote:

> Hi Laurent,
>
> This is because the deduplicate node generates an updating stream, however
> Kafka currently only supports append-only stream.
> This can be addressed in release-1.12, because we introduce a new
> connector "upsert-kafka" which supports writing updating
>  streams into Kafka compacted topics.
>
> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
> ConsumerRecord#timestamp()?
> If yes, this is also supported in release-1.12 via metadata syntax in DDL
> [1]:
>
> CREATE TABLE kafka_table (
>   id BIGINT,
>   name STRING,
>   timestamp BIGINT METADATA,  -- read timestamp
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'test-topic',
>   'format' = 'avro'
> )
>
> Best,
> Jark
>
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>
> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Hello,
>>
>> I'm getting an error  in Flink SQL when reading from kafka, deduplicating
>> records and sending them back to Kafka.
>>
>> The behavior I want is the following:
>>
>> *input:*
>> | client_number | address |
>> | ------------------- | ----------- |
>> | 1                      | addr1     |
>> | 1                      | addr1     |
>> | 1                      | addr2     |
>> | 1                      | addr2     |
>> | 1                      | addr1     |
>> | 1                      | addr1     |
>>
>> *output:*
>> | client_number | address |
>> | ------------------- | ----------- |
>> | 1                      | addr1     |
>> | 1                      | addr2     |
>> | 1                      | addr1     |
>>
>> The error seems to say that the type of stream created by the
>> deduplication query is of "update & delete" type, while kafka only supports
>> append-only:
>>
>> Unsupported query
>> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
>> update and delete changes which is produced by node
>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>
>>
>> --> Is there a way to create an append only query from this kind of
>> deduplication query (see my code here below)?
>> --> Would that work if I would use, say, a Postgres sink?
>>
>> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
>> (here I generated a processing date to allow ordering during deduplication)
>>
>> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
>> Flink SQL itself.
>>
>> Thanks in advance for your help.
>>
>> Best Regards,
>>
>> Laurent.
>>
>> -----------------------------------
>> -- Read from customers kafka topic
>> -----------------------------------
>> CREATE TEMPORARY TABLE customers (
>> `client_number` INT,
>> `name` VARCHAR(100),
>> `address` VARCHAR(100)
>> )
>> COMMENT ''
>> WITH (
>> 'connector' = 'kafka',
>> 'format' = 'csv',
>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>> 'properties.group.id' = 'flinkSQL',
>> 'topic' = 'customers',
>> 'csv.field-delimiter' = ';',
>> 'scan.startup.mode' = 'earliest-offset'
>> );
>>
>>
>>
>> -----------------------------------
>> -- Add metadata
>> -----------------------------------
>> CREATE TEMPORARY VIEW metadata AS
>> SELECT *
>> , sha256(cast(client_number as STRING)) AS customer_pk
>> , current_timestamp AS load_date
>> , 'Kafka topic: customers' AS record_source
>> FROM customers;
>>
>>
>>
>> -----------------------------------
>> -- Deduplicate addresses
>> -----------------------------------
>> CREATE TEMPORARY VIEW dedup_address as
>> SELECT customer_pk
>> , client_number
>> , load_date
>> , address
>> FROM (
>> SELECT customer_pk
>> , client_number
>> , load_date
>> , record_source
>> , address
>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
>> ORDER BY load_date ASC) AS rownum
>> FROM metadata
>> ) where rownum = 1;
>>
>>
>>
>>
>>
>>
>> -----------------------------------
>> -- Send to sat_customers_address kafka topic
>> -----------------------------------
>> CREATE TEMPORARY TABLE sat_customers_address (
>> `customer_pk` VARCHAR(64),
>> `client_number` INT,
>> `address` VARCHAR(100)
>> )
>> COMMENT ''
>> WITH (
>> 'connector' = 'kafka',
>> 'format' = 'csv',
>> 'properties.bootstrap.servers' =
>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
>> 'properties.group.id' = 'flinkSQL',
>> 'topic' = 'sat_customers_address'
>> );
>>
>> INSERT INTO sat_customers_address
>> SELECT customer_pk
>> , client_number
>> , address
>> FROM dedup_address;
>>
>>
>>
>>
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>>
>> *EURA NOVA*
>>
>> Rue Emile Francqui, 4
>>
>> 1435 Mont-Saint-Guibert
>>
>> (T) +32 10 75 02 00
>>
>> *euranova.eu <http://euranova.eu/>*
>>
>> *research.euranova.eu* <http://research.euranova.eu/>
>>
>> ♻ Be green, keep it on the screen
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

-- 
♻ Be green, keep it on the screen

Reply via email to