Hi Laurent,

1. Deduplicate with keeping the first row will generate an append-only
stream. But I guess you are expecting to keep the last row which generates
an updating stream. An alternative way is you can
 use the "changelog-json" format in this repo [1], it will convert the
updating stream into append
records with change flag encoded.
2. Yes. It will replace records with the same key, i.e. upsert statement.
3. I think it will be available in one or two months. There will be a first
release candidate soon.
    You can watch on the dev ML. I'm not sure the plan of Ververica
platform, cc @Konstantin Knauf <konstan...@ververica.com>

Best,
Jark

[1]:
https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <laurent.exste...@euranova.eu>
wrote:

> Hi Jark,
>
> thanks for your quick reply. I was indeed expecting it.
>
> But that triggers the following questions:
>
>    1. Is there another way to do this deduplication and generate an
>    append-only stream? Match Recognize? UDF? ...?
>    2. If I would put Postgres as a sink, what would happen? Will the
>    events happen or will they replace the record with the same key?
>    3. When will release-1.12 be available? And when would it be
>    integrated in the Ververica platform?
>
> Thanks a lot for your help!
>
> Best Regards,
>
> Laurent.
>
>
>
> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Laurent,
>>
>> This is because the deduplicate node generates an updating stream,
>> however Kafka currently only supports append-only stream.
>> This can be addressed in release-1.12, because we introduce a new
>> connector "upsert-kafka" which supports writing updating
>>  streams into Kafka compacted topics.
>>
>> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
>> ConsumerRecord#timestamp()?
>> If yes, this is also supported in release-1.12 via metadata syntax in DDL
>> [1]:
>>
>> CREATE TABLE kafka_table (
>>   id BIGINT,
>>   name STRING,
>>   timestamp BIGINT METADATA,  -- read timestamp
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'test-topic',
>>   'format' = 'avro'
>> )
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>
>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>> laurent.exste...@euranova.eu> wrote:
>>
>>> Hello,
>>>
>>> I'm getting an error  in Flink SQL when reading from kafka,
>>> deduplicating records and sending them back to Kafka.
>>>
>>> The behavior I want is the following:
>>>
>>> *input:*
>>> | client_number | address |
>>> | ------------------- | ----------- |
>>> | 1                      | addr1     |
>>> | 1                      | addr1     |
>>> | 1                      | addr2     |
>>> | 1                      | addr2     |
>>> | 1                      | addr1     |
>>> | 1                      | addr1     |
>>>
>>> *output:*
>>> | client_number | address |
>>> | ------------------- | ----------- |
>>> | 1                      | addr1     |
>>> | 1                      | addr2     |
>>> | 1                      | addr1     |
>>>
>>> The error seems to say that the type of stream created by the
>>> deduplication query is of "update & delete" type, while kafka only supports
>>> append-only:
>>>
>>> Unsupported query
>>> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
>>> update and delete changes which is produced by node
>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>>
>>>
>>> --> Is there a way to create an append only query from this kind of
>>> deduplication query (see my code here below)?
>>> --> Would that work if I would use, say, a Postgres sink?
>>>
>>> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
>>> (here I generated a processing date to allow ordering during deduplication)
>>>
>>> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
>>> Flink SQL itself.
>>>
>>> Thanks in advance for your help.
>>>
>>> Best Regards,
>>>
>>> Laurent.
>>>
>>> -----------------------------------
>>> -- Read from customers kafka topic
>>> -----------------------------------
>>> CREATE TEMPORARY TABLE customers (
>>> `client_number` INT,
>>> `name` VARCHAR(100),
>>> `address` VARCHAR(100)
>>> )
>>> COMMENT ''
>>> WITH (
>>> 'connector' = 'kafka',
>>> 'format' = 'csv',
>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>>> 'properties.group.id' = 'flinkSQL',
>>> 'topic' = 'customers',
>>> 'csv.field-delimiter' = ';',
>>> 'scan.startup.mode' = 'earliest-offset'
>>> );
>>>
>>>
>>>
>>> -----------------------------------
>>> -- Add metadata
>>> -----------------------------------
>>> CREATE TEMPORARY VIEW metadata AS
>>> SELECT *
>>> , sha256(cast(client_number as STRING)) AS customer_pk
>>> , current_timestamp AS load_date
>>> , 'Kafka topic: customers' AS record_source
>>> FROM customers;
>>>
>>>
>>>
>>> -----------------------------------
>>> -- Deduplicate addresses
>>> -----------------------------------
>>> CREATE TEMPORARY VIEW dedup_address as
>>> SELECT customer_pk
>>> , client_number
>>> , load_date
>>> , address
>>> FROM (
>>> SELECT customer_pk
>>> , client_number
>>> , load_date
>>> , record_source
>>> , address
>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
>>> ORDER BY load_date ASC) AS rownum
>>> FROM metadata
>>> ) where rownum = 1;
>>>
>>>
>>>
>>>
>>>
>>>
>>> -----------------------------------
>>> -- Send to sat_customers_address kafka topic
>>> -----------------------------------
>>> CREATE TEMPORARY TABLE sat_customers_address (
>>> `customer_pk` VARCHAR(64),
>>> `client_number` INT,
>>> `address` VARCHAR(100)
>>> )
>>> COMMENT ''
>>> WITH (
>>> 'connector' = 'kafka',
>>> 'format' = 'csv',
>>> 'properties.bootstrap.servers' =
>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
>>> 'properties.group.id' = 'flinkSQL',
>>> 'topic' = 'sat_customers_address'
>>> );
>>>
>>> INSERT INTO sat_customers_address
>>> SELECT customer_pk
>>> , client_number
>>> , address
>>> FROM dedup_address;
>>>
>>>
>>>
>>>
>>> --
>>> *Laurent Exsteens*
>>> Data Engineer
>>> (M) +32 (0) 486 20 48 36
>>>
>>> *EURA NOVA*
>>>
>>> Rue Emile Francqui, 4
>>>
>>> 1435 Mont-Saint-Guibert
>>>
>>> (T) +32 10 75 02 00
>>>
>>> *euranova.eu <http://euranova.eu/>*
>>>
>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>
>>> ♻ Be green, keep it on the screen
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu <http://euranova.eu/>*
>
> *research.euranova.eu* <http://research.euranova.eu/>
>
> ♻ Be green, keep it on the screen

Reply via email to