Hi Jark,

thanks for your quick reply. I was indeed expecting it.

But that triggers the following questions:

   1. Is there another way to do this deduplication and generate an
   append-only stream? Match Recognize? UDF? ...?
   2. If I would put Postgres as a sink, what would happen? Will the events
   happen or will they replace the record with the same key?
   3. When will release-1.12 be available? And when would it be integrated
   in the Ververica platform?

Thanks a lot for your help!

Best Regards,


On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote:

> Hi Laurent,
> This is because the deduplicate node generates an updating stream, however
> Kafka currently only supports append-only stream.
> This can be addressed in release-1.12, because we introduce a new
> connector "upsert-kafka" which supports writing updating
>  streams into Kafka compacted topics.
> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
> ConsumerRecord#timestamp()?
> If yes, this is also supported in release-1.12 via metadata syntax in DDL
> [1]:
> CREATE TABLE kafka_table (
>   id BIGINT,
>   name STRING,
>   timestamp BIGINT METADATA,  -- read timestamp
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'test-topic',
>   'format' = 'avro'
> )
> Best,
> Jark
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>> Hello,
>> I'm getting an error  in Flink SQL when reading from kafka, deduplicating
>> records and sending them back to Kafka.
>> The behavior I want is the following:
>> *input:*
>> | client_number | address |
>> | ------------------- | ----------- |
>> | 1                      | addr1     |
>> | 1                      | addr1     |
>> | 1                      | addr2     |
>> | 1                      | addr2     |
>> | 1                      | addr1     |
>> | 1                      | addr1     |
>> *output:*
>> | client_number | address |
>> | ------------------- | ----------- |
>> | 1                      | addr1     |
>> | 1                      | addr2     |
>> | 1                      | addr1     |
>> The error seems to say that the type of stream created by the
>> deduplication query is of "update & delete" type, while kafka only supports
>> append-only:
>> Unsupported query
>> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
>> update and delete changes which is produced by node
>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>> --> Is there a way to create an append only query from this kind of
>> deduplication query (see my code here below)?
>> --> Would that work if I would use, say, a Postgres sink?
>> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
>> (here I generated a processing date to allow ordering during deduplication)
>> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
>> Flink SQL itself.
>> Thanks in advance for your help.
>> Best Regards,
>> Laurent.
>> -----------------------------------
>> -- Read from customers kafka topic
>> -----------------------------------
>> `client_number` INT,
>> `name` VARCHAR(100),
>> `address` VARCHAR(100)
>> )
>> WITH (
>> 'connector' = 'kafka',
>> 'format' = 'csv',
>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>> 'properties.group.id' = 'flinkSQL',
>> 'topic' = 'customers',
>> 'csv.field-delimiter' = ';',
>> 'scan.startup.mode' = 'earliest-offset'
>> );
>> -----------------------------------
>> -- Add metadata
>> -----------------------------------
>> , sha256(cast(client_number as STRING)) AS customer_pk
>> , current_timestamp AS load_date
>> , 'Kafka topic: customers' AS record_source
>> FROM customers;
>> -----------------------------------
>> -- Deduplicate addresses
>> -----------------------------------
>> CREATE TEMPORARY VIEW dedup_address as
>> SELECT customer_pk
>> , client_number
>> , load_date
>> , address
>> FROM (
>> SELECT customer_pk
>> , client_number
>> , load_date
>> , record_source
>> , address
>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
>> ORDER BY load_date ASC) AS rownum
>> FROM metadata
>> ) where rownum = 1;
>> -----------------------------------
>> -- Send to sat_customers_address kafka topic
>> -----------------------------------
>> CREATE TEMPORARY TABLE sat_customers_address (
>> `customer_pk` VARCHAR(64),
>> `client_number` INT,
>> `address` VARCHAR(100)
>> )
>> WITH (
>> 'connector' = 'kafka',
>> 'format' = 'csv',
>> 'properties.bootstrap.servers' =
>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
>> 'properties.group.id' = 'flinkSQL',
>> 'topic' = 'sat_customers_address'
>> );
>> INSERT INTO sat_customers_address
>> SELECT customer_pk
>> , client_number
>> , address
>> FROM dedup_address;
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>> Rue Emile Francqui, 4
>> 1435 Mont-Saint-Guibert
>> (T) +32 10 75 02 00
>> *euranova.eu <http://euranova.eu/>*
>> *research.euranova.eu* <http://research.euranova.eu/>
>> ♻ Be green, keep it on the screen

*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36


Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

♻ Be green, keep it on the screen

Reply via email to