Hi Laurent,

With respect to Ververica Platform, we will support Flink 1.12 and add
"upsert-kafka" as a packaged connector in our next minor release which we
target for February.

Cheers,

Konstantin

On Thu, Nov 12, 2020 at 3:43 AM Jark Wu <imj...@gmail.com> wrote:

> Hi Laurent,
>
> 1. Deduplicate with keeping the first row will generate an append-only
> stream. But I guess you are expecting to keep the last row which generates
> an updating stream. An alternative way is you can
>  use the "changelog-json" format in this repo [1], it will convert the
> updating stream into append
> records with change flag encoded.
> 2. Yes. It will replace records with the same key, i.e. upsert statement.
> 3. I think it will be available in one or two months. There will be a
> first release candidate soon.
>     You can watch on the dev ML. I'm not sure the plan of Ververica
> platform, cc @Konstantin Knauf <konstan...@ververica.com>
>
> Best,
> Jark
>
> [1]:
> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
>
> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Hi Jark,
>>
>> thanks for your quick reply. I was indeed expecting it.
>>
>> But that triggers the following questions:
>>
>>    1. Is there another way to do this deduplication and generate an
>>    append-only stream? Match Recognize? UDF? ...?
>>    2. If I would put Postgres as a sink, what would happen? Will the
>>    events happen or will they replace the record with the same key?
>>    3. When will release-1.12 be available? And when would it be
>>    integrated in the Ververica platform?
>>
>> Thanks a lot for your help!
>>
>> Best Regards,
>>
>> Laurent.
>>
>>
>>
>> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote:
>>
>>> Hi Laurent,
>>>
>>> This is because the deduplicate node generates an updating stream,
>>> however Kafka currently only supports append-only stream.
>>> This can be addressed in release-1.12, because we introduce a new
>>> connector "upsert-kafka" which supports writing updating
>>>  streams into Kafka compacted topics.
>>>
>>> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
>>> ConsumerRecord#timestamp()?
>>> If yes, this is also supported in release-1.12 via metadata syntax in
>>> DDL [1]:
>>>
>>> CREATE TABLE kafka_table (
>>>   id BIGINT,
>>>   name STRING,
>>>   timestamp BIGINT METADATA,  -- read timestamp
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'topic' = 'test-topic',
>>>   'format' = 'avro'
>>> )
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>>
>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>>> laurent.exste...@euranova.eu> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm getting an error  in Flink SQL when reading from kafka,
>>>> deduplicating records and sending them back to Kafka.
>>>>
>>>> The behavior I want is the following:
>>>>
>>>> *input:*
>>>> | client_number | address |
>>>> | ------------------- | ----------- |
>>>> | 1                      | addr1     |
>>>> | 1                      | addr1     |
>>>> | 1                      | addr2     |
>>>> | 1                      | addr2     |
>>>> | 1                      | addr1     |
>>>> | 1                      | addr1     |
>>>>
>>>> *output:*
>>>> | client_number | address |
>>>> | ------------------- | ----------- |
>>>> | 1                      | addr1     |
>>>> | 1                      | addr2     |
>>>> | 1                      | addr1     |
>>>>
>>>> The error seems to say that the type of stream created by the
>>>> deduplication query is of "update & delete" type, while kafka only supports
>>>> append-only:
>>>>
>>>> Unsupported query
>>>> Table sink 'vvp.default.sat_customers_address' doesn't support
>>>> consuming update and delete changes which is produced by node
>>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>>>
>>>>
>>>> --> Is there a way to create an append only query from this kind of
>>>> deduplication query (see my code here below)?
>>>> --> Would that work if I would use, say, a Postgres sink?
>>>>
>>>> Bonus question: can we extract the Kafka ingestion date using Flink
>>>> SQL? (here I generated a processing date to allow ordering during
>>>> deduplication)
>>>>
>>>> P.S.: I'm on the Ververica Platform, but I guess this error is linked
>>>> to Flink SQL itself.
>>>>
>>>> Thanks in advance for your help.
>>>>
>>>> Best Regards,
>>>>
>>>> Laurent.
>>>>
>>>> -----------------------------------
>>>> -- Read from customers kafka topic
>>>> -----------------------------------
>>>> CREATE TEMPORARY TABLE customers (
>>>> `client_number` INT,
>>>> `name` VARCHAR(100),
>>>> `address` VARCHAR(100)
>>>> )
>>>> COMMENT ''
>>>> WITH (
>>>> 'connector' = 'kafka',
>>>> 'format' = 'csv',
>>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>>>> 'properties.group.id' = 'flinkSQL',
>>>> 'topic' = 'customers',
>>>> 'csv.field-delimiter' = ';',
>>>> 'scan.startup.mode' = 'earliest-offset'
>>>> );
>>>>
>>>>
>>>>
>>>> -----------------------------------
>>>> -- Add metadata
>>>> -----------------------------------
>>>> CREATE TEMPORARY VIEW metadata AS
>>>> SELECT *
>>>> , sha256(cast(client_number as STRING)) AS customer_pk
>>>> , current_timestamp AS load_date
>>>> , 'Kafka topic: customers' AS record_source
>>>> FROM customers;
>>>>
>>>>
>>>>
>>>> -----------------------------------
>>>> -- Deduplicate addresses
>>>> -----------------------------------
>>>> CREATE TEMPORARY VIEW dedup_address as
>>>> SELECT customer_pk
>>>> , client_number
>>>> , load_date
>>>> , address
>>>> FROM (
>>>> SELECT customer_pk
>>>> , client_number
>>>> , load_date
>>>> , record_source
>>>> , address
>>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
>>>> ORDER BY load_date ASC) AS rownum
>>>> FROM metadata
>>>> ) where rownum = 1;
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> -----------------------------------
>>>> -- Send to sat_customers_address kafka topic
>>>> -----------------------------------
>>>> CREATE TEMPORARY TABLE sat_customers_address (
>>>> `customer_pk` VARCHAR(64),
>>>> `client_number` INT,
>>>> `address` VARCHAR(100)
>>>> )
>>>> COMMENT ''
>>>> WITH (
>>>> 'connector' = 'kafka',
>>>> 'format' = 'csv',
>>>> 'properties.bootstrap.servers' =
>>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
>>>> 'properties.group.id' = 'flinkSQL',
>>>> 'topic' = 'sat_customers_address'
>>>> );
>>>>
>>>> INSERT INTO sat_customers_address
>>>> SELECT customer_pk
>>>> , client_number
>>>> , address
>>>> FROM dedup_address;
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> *Laurent Exsteens*
>>>> Data Engineer
>>>> (M) +32 (0) 486 20 48 36
>>>>
>>>> *EURA NOVA*
>>>>
>>>> Rue Emile Francqui, 4
>>>>
>>>> 1435 Mont-Saint-Guibert
>>>>
>>>> (T) +32 10 75 02 00
>>>>
>>>> *euranova.eu <http://euranova.eu/>*
>>>>
>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>
>>>> ♻ Be green, keep it on the screen
>>>
>>>
>>
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>>
>> *EURA NOVA*
>>
>> Rue Emile Francqui, 4
>>
>> 1435 Mont-Saint-Guibert
>>
>> (T) +32 10 75 02 00
>>
>> *euranova.eu <http://euranova.eu/>*
>>
>> *research.euranova.eu* <http://research.euranova.eu/>
>>
>> ♻ Be green, keep it on the screen
>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Reply via email to