Hi Laurent,

1. Currently, it's impossible to convert deduplicate with last row into an
append-only stream.
2. Yes, I think Ververica platform doesn't support 'changelog-json' format
natively.

However, regarding your case, I think you can use keep first row on
client_number+address key.

SELECT *
FROM (
   SELECT client_number, address, load_date
     ROW_NUMBER() OVER
       (PARTITION BY client_number, address ORDER BY proctime() ASC) AS
rownum
   FROM src)
WHERE rownum = 1

That means, the duplicate records on the same client_number + address will
be ignored,
but the new value of address will be emitted as an append-only stream.

Hope this helps you.

Best,
Jark


On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens <laurent.exste...@euranova.eu>
wrote:

> Hi Jark,
>
> thanks again for your quick response!
>
> I tried multiple variants of my query by:
> - specifying only the primary key in the PARTITION BY clause
> - changing the order to DESC to keep the last row
>
> --> I unfortunately always get the same error message.
> If I try to make a simple select on the result of this query, I also get
> the following error: The submitted query is not an append-only query.
> Only queries producing exclusively new rows over time are supported at the
> moment. So whatever change I make, I never get an append-only query -->
> Is there something I missed?
>
> I also tried to write to kafka as changelog-json, but I got the answer: The
> sink connector for table `vvp`.`default`.`sat_customers_address` could not
> be created. 'changelog-json' is not a supported sink format. Supported sink
> formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet'].
> (maybe because I'm on the Ververica platform?)
> This also seem to require an extra kafka topic then, so not ideal.
>
>
> *I'm starting to wonder if the deduplication query is really what I need.*
>
> What I need is:
> - to forward only the records where some columns (ideally configurable)
> change for a specific primary key.
> - in realtime (no windowing)
> - and have as a result an append-only stream.
>
> Like this:
>
> *input:*
>                     *output* (this is what should ultimatelly be
> published to Kafka and later inserted in a RDBMS):
> | client_number | address |  load_date |                   | client_number
> | address |  load_date |
> | ------------------- | ----------- | -------------- |                  |
> ------------------- | ----------- | -------------- |
> | 1                      | addr1     | ts1             |     -->        |
> 1                      | addr1     | ts1             |
> | 1                      | addr1     | ts2             |
> | 1                      | addr2     | ts3             |     -->        |
> 1                      | addr2     | ts3             |
> | 1                      | addr2     | ts4             |
> | 1                      | addr1     | ts5             |     -->        |
> 1                      | addr1     | ts5             |
> | 1                      | addr1     | ts6             |
>
>
> --> is this deduplication query the right fit therefore?
>      - if yes, how should it be written to generate an append-only stream?
>      - If not, are there other options? (Match Recognize, UDF, ....?)
>
> Thanks a lot for your much appreciated help :).
>
> Best Regards,
>
> Laurent.
>
>
> On Thu, 12 Nov 2020 at 07:26, Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Laurent,
>>
>> > What I want is a record to be forwarded only if some of the columns
>> change
>>
>> IIUC, what you want is still deduplication with the last row.
>> Keeping first row will drop all the duplicate rows on the same primary
>> key.
>> Keeping last row will emit updates when the duplicate rows on the same
>> primary key, that means column value changes will notify downstream
>> operators.
>> The difference of keeping first row and last row is specified by the
>> direction of ORDER BY clause [1].
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>
>>
>>
>>
>> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens <
>> laurent.exste...@euranova.eu> wrote:
>>
>>> Thanks.
>>>
>>> I actually want the first row. What I want is a record to be forwarded
>>> only if some of the columns change (of course keyed by the primary key). I
>>> used rownum = 1, is that not selecting the first row?
>>>
>>> How can I adapt my query to let only the row effectively changing the
>>> values pass, as an append only stream?
>>>
>>> If not possible, I'll look at converting it after. But I prefer a
>>> solution in the deduplication query.
>>> The goal is to show my customer that what they want to achieve is very
>>> straightforward in flink SQL, so the simpler the queries the better. I need
>>> to present my conclusions tomorrow.
>>>
>>> Thanks a lot already for your help!
>>>
>>> Best regards,
>>>
>>> Laurent.
>>>
>>>
>>> On Thu, Nov 12, 2020, 03:43 Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Hi Laurent,
>>>>
>>>> 1. Deduplicate with keeping the first row will generate an append-only
>>>> stream. But I guess you are expecting to keep the last row which generates
>>>> an updating stream. An alternative way is you can
>>>>  use the "changelog-json" format in this repo [1], it will convert the
>>>> updating stream into append
>>>> records with change flag encoded.
>>>> 2. Yes. It will replace records with the same key, i.e. upsert
>>>> statement.
>>>> 3. I think it will be available in one or two months. There will be a
>>>> first release candidate soon.
>>>>     You can watch on the dev ML. I'm not sure the plan of Ververica
>>>> platform, cc @Konstantin Knauf <konstan...@ververica.com>
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> [1]:
>>>> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
>>>>
>>>> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <
>>>> laurent.exste...@euranova.eu> wrote:
>>>>
>>>>> Hi Jark,
>>>>>
>>>>> thanks for your quick reply. I was indeed expecting it.
>>>>>
>>>>> But that triggers the following questions:
>>>>>
>>>>>    1. Is there another way to do this deduplication and generate an
>>>>>    append-only stream? Match Recognize? UDF? ...?
>>>>>    2. If I would put Postgres as a sink, what would happen? Will the
>>>>>    events happen or will they replace the record with the same key?
>>>>>    3. When will release-1.12 be available? And when would it be
>>>>>    integrated in the Ververica platform?
>>>>>
>>>>> Thanks a lot for your help!
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Laurent.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote:
>>>>>
>>>>>> Hi Laurent,
>>>>>>
>>>>>> This is because the deduplicate node generates an updating stream,
>>>>>> however Kafka currently only supports append-only stream.
>>>>>> This can be addressed in release-1.12, because we introduce a new
>>>>>> connector "upsert-kafka" which supports writing updating
>>>>>>  streams into Kafka compacted topics.
>>>>>>
>>>>>> Does the "Kafka ingestion date" refer to "kafka message timestamp",
>>>>>> i.e. ConsumerRecord#timestamp()?
>>>>>> If yes, this is also supported in release-1.12 via metadata syntax in
>>>>>> DDL [1]:
>>>>>>
>>>>>> CREATE TABLE kafka_table (
>>>>>>   id BIGINT,
>>>>>>   name STRING,
>>>>>>   timestamp BIGINT METADATA,  -- read timestamp
>>>>>> ) WITH (
>>>>>>   'connector' = 'kafka',
>>>>>>   'topic' = 'test-topic',
>>>>>>   'format' = 'avro'
>>>>>> )
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> [1]:
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>>>>>
>>>>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>>>>>> laurent.exste...@euranova.eu> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm getting an error  in Flink SQL when reading from kafka,
>>>>>>> deduplicating records and sending them back to Kafka.
>>>>>>>
>>>>>>> The behavior I want is the following:
>>>>>>>
>>>>>>> *input:*
>>>>>>> | client_number | address |
>>>>>>> | ------------------- | ----------- |
>>>>>>> | 1                      | addr1     |
>>>>>>> | 1                      | addr1     |
>>>>>>> | 1                      | addr2     |
>>>>>>> | 1                      | addr2     |
>>>>>>> | 1                      | addr1     |
>>>>>>> | 1                      | addr1     |
>>>>>>>
>>>>>>> *output:*
>>>>>>> | client_number | address |
>>>>>>> | ------------------- | ----------- |
>>>>>>> | 1                      | addr1     |
>>>>>>> | 1                      | addr2     |
>>>>>>> | 1                      | addr1     |
>>>>>>>
>>>>>>> The error seems to say that the type of stream created by the
>>>>>>> deduplication query is of "update & delete" type, while kafka only 
>>>>>>> supports
>>>>>>> append-only:
>>>>>>>
>>>>>>> Unsupported query
>>>>>>> Table sink 'vvp.default.sat_customers_address' doesn't support
>>>>>>> consuming update and delete changes which is produced by node
>>>>>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>>>>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>>>>>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>>>>>>
>>>>>>>
>>>>>>> --> Is there a way to create an append only query from this kind of
>>>>>>> deduplication query (see my code here below)?
>>>>>>> --> Would that work if I would use, say, a Postgres sink?
>>>>>>>
>>>>>>> Bonus question: can we extract the Kafka ingestion date using Flink
>>>>>>> SQL? (here I generated a processing date to allow ordering during
>>>>>>> deduplication)
>>>>>>>
>>>>>>> P.S.: I'm on the Ververica Platform, but I guess this error is
>>>>>>> linked to Flink SQL itself.
>>>>>>>
>>>>>>> Thanks in advance for your help.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>>
>>>>>>> Laurent.
>>>>>>>
>>>>>>> -----------------------------------
>>>>>>> -- Read from customers kafka topic
>>>>>>> -----------------------------------
>>>>>>> CREATE TEMPORARY TABLE customers (
>>>>>>> `client_number` INT,
>>>>>>> `name` VARCHAR(100),
>>>>>>> `address` VARCHAR(100)
>>>>>>> )
>>>>>>> COMMENT ''
>>>>>>> WITH (
>>>>>>> 'connector' = 'kafka',
>>>>>>> 'format' = 'csv',
>>>>>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>>>>>>> 'properties.group.id' = 'flinkSQL',
>>>>>>> 'topic' = 'customers',
>>>>>>> 'csv.field-delimiter' = ';',
>>>>>>> 'scan.startup.mode' = 'earliest-offset'
>>>>>>> );
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -----------------------------------
>>>>>>> -- Add metadata
>>>>>>> -----------------------------------
>>>>>>> CREATE TEMPORARY VIEW metadata AS
>>>>>>> SELECT *
>>>>>>> , sha256(cast(client_number as STRING)) AS customer_pk
>>>>>>> , current_timestamp AS load_date
>>>>>>> , 'Kafka topic: customers' AS record_source
>>>>>>> FROM customers;
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -----------------------------------
>>>>>>> -- Deduplicate addresses
>>>>>>> -----------------------------------
>>>>>>> CREATE TEMPORARY VIEW dedup_address as
>>>>>>> SELECT customer_pk
>>>>>>> , client_number
>>>>>>> , load_date
>>>>>>> , address
>>>>>>> FROM (
>>>>>>> SELECT customer_pk
>>>>>>> , client_number
>>>>>>> , load_date
>>>>>>> , record_source
>>>>>>> , address
>>>>>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number,
>>>>>>> address ORDER BY load_date ASC) AS rownum
>>>>>>> FROM metadata
>>>>>>> ) where rownum = 1;
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -----------------------------------
>>>>>>> -- Send to sat_customers_address kafka topic
>>>>>>> -----------------------------------
>>>>>>> CREATE TEMPORARY TABLE sat_customers_address (
>>>>>>> `customer_pk` VARCHAR(64),
>>>>>>> `client_number` INT,
>>>>>>> `address` VARCHAR(100)
>>>>>>> )
>>>>>>> COMMENT ''
>>>>>>> WITH (
>>>>>>> 'connector' = 'kafka',
>>>>>>> 'format' = 'csv',
>>>>>>> 'properties.bootstrap.servers' =
>>>>>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
>>>>>>> 'properties.group.id' = 'flinkSQL',
>>>>>>> 'topic' = 'sat_customers_address'
>>>>>>> );
>>>>>>>
>>>>>>> INSERT INTO sat_customers_address
>>>>>>> SELECT customer_pk
>>>>>>> , client_number
>>>>>>> , address
>>>>>>> FROM dedup_address;
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Laurent Exsteens*
>>>>>>> Data Engineer
>>>>>>> (M) +32 (0) 486 20 48 36
>>>>>>>
>>>>>>> *EURA NOVA*
>>>>>>>
>>>>>>> Rue Emile Francqui, 4
>>>>>>>
>>>>>>> 1435 Mont-Saint-Guibert
>>>>>>>
>>>>>>> (T) +32 10 75 02 00
>>>>>>>
>>>>>>> *euranova.eu <http://euranova.eu/>*
>>>>>>>
>>>>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>>>>
>>>>>>> ♻ Be green, keep it on the screen
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> *Laurent Exsteens*
>>>>> Data Engineer
>>>>> (M) +32 (0) 486 20 48 36
>>>>>
>>>>> *EURA NOVA*
>>>>>
>>>>> Rue Emile Francqui, 4
>>>>>
>>>>> 1435 Mont-Saint-Guibert
>>>>>
>>>>> (T) +32 10 75 02 00
>>>>>
>>>>> *euranova.eu <http://euranova.eu/>*
>>>>>
>>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>>
>>>>> ♻ Be green, keep it on the screen
>>>>
>>>>
>>> ♻ Be green, keep it on the screen
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu <http://euranova.eu/>*
>
> *research.euranova.eu* <http://research.euranova.eu/>
>
> ♻ Be green, keep it on the screen

Reply via email to