Hi Jark,

thanks again for your quick response!

I tried multiple variants of my query by:
- specifying only the primary key in the PARTITION BY clause
- changing the order to DESC to keep the last row

--> I unfortunately always get the same error message.
If I try to make a simple select on the result of this query, I also get
the following error: The submitted query is not an append-only query. Only
queries producing exclusively new rows over time are supported at the moment.
So whatever change I make, I never get an append-only query --> Is there
something I missed?

I also tried to write to kafka as changelog-json, but I got the answer: The
sink connector for table `vvp`.`default`.`sat_customers_address` could not
be created. 'changelog-json' is not a supported sink format. Supported sink
formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet'].
(maybe because I'm on the Ververica platform?)
This also seem to require an extra kafka topic then, so not ideal.


*I'm starting to wonder if the deduplication query is really what I need.*

What I need is:
- to forward only the records where some columns (ideally configurable)
change for a specific primary key.
- in realtime (no windowing)
- and have as a result an append-only stream.

Like this:

*input:*
                    *output* (this is what should ultimatelly be published
to Kafka and later inserted in a RDBMS):
| client_number | address |  load_date |                   | client_number
| address |  load_date |
| ------------------- | ----------- | -------------- |                  |
------------------- | ----------- | -------------- |
| 1                      | addr1     | ts1             |     -->        |
1                      | addr1     | ts1             |
| 1                      | addr1     | ts2             |
| 1                      | addr2     | ts3             |     -->        |
1                      | addr2     | ts3             |
| 1                      | addr2     | ts4             |
| 1                      | addr1     | ts5             |     -->        |
1                      | addr1     | ts5             |
| 1                      | addr1     | ts6             |


--> is this deduplication query the right fit therefore?
     - if yes, how should it be written to generate an append-only stream?
     - If not, are there other options? (Match Recognize, UDF, ....?)

Thanks a lot for your much appreciated help :).

Best Regards,

Laurent.


On Thu, 12 Nov 2020 at 07:26, Jark Wu <[email protected]> wrote:

> Hi Laurent,
>
> > What I want is a record to be forwarded only if some of the columns
> change
>
> IIUC, what you want is still deduplication with the last row.
> Keeping first row will drop all the duplicate rows on the same primary
> key.
> Keeping last row will emit updates when the duplicate rows on the same
> primary key, that means column value changes will notify downstream
> operators.
> The difference of keeping first row and last row is specified by the
> direction of ORDER BY clause [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>
>
>
>
> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens <
> [email protected]> wrote:
>
>> Thanks.
>>
>> I actually want the first row. What I want is a record to be forwarded
>> only if some of the columns change (of course keyed by the primary key). I
>> used rownum = 1, is that not selecting the first row?
>>
>> How can I adapt my query to let only the row effectively changing the
>> values pass, as an append only stream?
>>
>> If not possible, I'll look at converting it after. But I prefer a
>> solution in the deduplication query.
>> The goal is to show my customer that what they want to achieve is very
>> straightforward in flink SQL, so the simpler the queries the better. I need
>> to present my conclusions tomorrow.
>>
>> Thanks a lot already for your help!
>>
>> Best regards,
>>
>> Laurent.
>>
>>
>> On Thu, Nov 12, 2020, 03:43 Jark Wu <[email protected]> wrote:
>>
>>> Hi Laurent,
>>>
>>> 1. Deduplicate with keeping the first row will generate an append-only
>>> stream. But I guess you are expecting to keep the last row which generates
>>> an updating stream. An alternative way is you can
>>>  use the "changelog-json" format in this repo [1], it will convert the
>>> updating stream into append
>>> records with change flag encoded.
>>> 2. Yes. It will replace records with the same key, i.e. upsert
>>> statement.
>>> 3. I think it will be available in one or two months. There will be a
>>> first release candidate soon.
>>>     You can watch on the dev ML. I'm not sure the plan of Ververica
>>> platform, cc @Konstantin Knauf <[email protected]>
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
>>>
>>> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <
>>> [email protected]> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> thanks for your quick reply. I was indeed expecting it.
>>>>
>>>> But that triggers the following questions:
>>>>
>>>>    1. Is there another way to do this deduplication and generate an
>>>>    append-only stream? Match Recognize? UDF? ...?
>>>>    2. If I would put Postgres as a sink, what would happen? Will the
>>>>    events happen or will they replace the record with the same key?
>>>>    3. When will release-1.12 be available? And when would it be
>>>>    integrated in the Ververica platform?
>>>>
>>>> Thanks a lot for your help!
>>>>
>>>> Best Regards,
>>>>
>>>> Laurent.
>>>>
>>>>
>>>>
>>>> On Wed, 11 Nov 2020 at 03:31, Jark Wu <[email protected]> wrote:
>>>>
>>>>> Hi Laurent,
>>>>>
>>>>> This is because the deduplicate node generates an updating stream,
>>>>> however Kafka currently only supports append-only stream.
>>>>> This can be addressed in release-1.12, because we introduce a new
>>>>> connector "upsert-kafka" which supports writing updating
>>>>>  streams into Kafka compacted topics.
>>>>>
>>>>> Does the "Kafka ingestion date" refer to "kafka message timestamp",
>>>>> i.e. ConsumerRecord#timestamp()?
>>>>> If yes, this is also supported in release-1.12 via metadata syntax in
>>>>> DDL [1]:
>>>>>
>>>>> CREATE TABLE kafka_table (
>>>>>   id BIGINT,
>>>>>   name STRING,
>>>>>   timestamp BIGINT METADATA,  -- read timestamp
>>>>> ) WITH (
>>>>>   'connector' = 'kafka',
>>>>>   'topic' = 'test-topic',
>>>>>   'format' = 'avro'
>>>>> )
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> [1]:
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>>>>
>>>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm getting an error  in Flink SQL when reading from kafka,
>>>>>> deduplicating records and sending them back to Kafka.
>>>>>>
>>>>>> The behavior I want is the following:
>>>>>>
>>>>>> *input:*
>>>>>> | client_number | address |
>>>>>> | ------------------- | ----------- |
>>>>>> | 1                      | addr1     |
>>>>>> | 1                      | addr1     |
>>>>>> | 1                      | addr2     |
>>>>>> | 1                      | addr2     |
>>>>>> | 1                      | addr1     |
>>>>>> | 1                      | addr1     |
>>>>>>
>>>>>> *output:*
>>>>>> | client_number | address |
>>>>>> | ------------------- | ----------- |
>>>>>> | 1                      | addr1     |
>>>>>> | 1                      | addr2     |
>>>>>> | 1                      | addr1     |
>>>>>>
>>>>>> The error seems to say that the type of stream created by the
>>>>>> deduplication query is of "update & delete" type, while kafka only 
>>>>>> supports
>>>>>> append-only:
>>>>>>
>>>>>> Unsupported query
>>>>>> Table sink 'vvp.default.sat_customers_address' doesn't support
>>>>>> consuming update and delete changes which is produced by node
>>>>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>>>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>>>>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>>>>>
>>>>>>
>>>>>> --> Is there a way to create an append only query from this kind of
>>>>>> deduplication query (see my code here below)?
>>>>>> --> Would that work if I would use, say, a Postgres sink?
>>>>>>
>>>>>> Bonus question: can we extract the Kafka ingestion date using Flink
>>>>>> SQL? (here I generated a processing date to allow ordering during
>>>>>> deduplication)
>>>>>>
>>>>>> P.S.: I'm on the Ververica Platform, but I guess this error is linked
>>>>>> to Flink SQL itself.
>>>>>>
>>>>>> Thanks in advance for your help.
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Laurent.
>>>>>>
>>>>>> -----------------------------------
>>>>>> -- Read from customers kafka topic
>>>>>> -----------------------------------
>>>>>> CREATE TEMPORARY TABLE customers (
>>>>>> `client_number` INT,
>>>>>> `name` VARCHAR(100),
>>>>>> `address` VARCHAR(100)
>>>>>> )
>>>>>> COMMENT ''
>>>>>> WITH (
>>>>>> 'connector' = 'kafka',
>>>>>> 'format' = 'csv',
>>>>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>>>>>> 'properties.group.id' = 'flinkSQL',
>>>>>> 'topic' = 'customers',
>>>>>> 'csv.field-delimiter' = ';',
>>>>>> 'scan.startup.mode' = 'earliest-offset'
>>>>>> );
>>>>>>
>>>>>>
>>>>>>
>>>>>> -----------------------------------
>>>>>> -- Add metadata
>>>>>> -----------------------------------
>>>>>> CREATE TEMPORARY VIEW metadata AS
>>>>>> SELECT *
>>>>>> , sha256(cast(client_number as STRING)) AS customer_pk
>>>>>> , current_timestamp AS load_date
>>>>>> , 'Kafka topic: customers' AS record_source
>>>>>> FROM customers;
>>>>>>
>>>>>>
>>>>>>
>>>>>> -----------------------------------
>>>>>> -- Deduplicate addresses
>>>>>> -----------------------------------
>>>>>> CREATE TEMPORARY VIEW dedup_address as
>>>>>> SELECT customer_pk
>>>>>> , client_number
>>>>>> , load_date
>>>>>> , address
>>>>>> FROM (
>>>>>> SELECT customer_pk
>>>>>> , client_number
>>>>>> , load_date
>>>>>> , record_source
>>>>>> , address
>>>>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
>>>>>> ORDER BY load_date ASC) AS rownum
>>>>>> FROM metadata
>>>>>> ) where rownum = 1;
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> -----------------------------------
>>>>>> -- Send to sat_customers_address kafka topic
>>>>>> -----------------------------------
>>>>>> CREATE TEMPORARY TABLE sat_customers_address (
>>>>>> `customer_pk` VARCHAR(64),
>>>>>> `client_number` INT,
>>>>>> `address` VARCHAR(100)
>>>>>> )
>>>>>> COMMENT ''
>>>>>> WITH (
>>>>>> 'connector' = 'kafka',
>>>>>> 'format' = 'csv',
>>>>>> 'properties.bootstrap.servers' =
>>>>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
>>>>>> 'properties.group.id' = 'flinkSQL',
>>>>>> 'topic' = 'sat_customers_address'
>>>>>> );
>>>>>>
>>>>>> INSERT INTO sat_customers_address
>>>>>> SELECT customer_pk
>>>>>> , client_number
>>>>>> , address
>>>>>> FROM dedup_address;
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Laurent Exsteens*
>>>>>> Data Engineer
>>>>>> (M) +32 (0) 486 20 48 36
>>>>>>
>>>>>> *EURA NOVA*
>>>>>>
>>>>>> Rue Emile Francqui, 4
>>>>>>
>>>>>> 1435 Mont-Saint-Guibert
>>>>>>
>>>>>> (T) +32 10 75 02 00
>>>>>>
>>>>>> *euranova.eu <http://euranova.eu/>*
>>>>>>
>>>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>>>
>>>>>> ♻ Be green, keep it on the screen
>>>>>
>>>>>
>>>>
>>>> --
>>>> *Laurent Exsteens*
>>>> Data Engineer
>>>> (M) +32 (0) 486 20 48 36
>>>>
>>>> *EURA NOVA*
>>>>
>>>> Rue Emile Francqui, 4
>>>>
>>>> 1435 Mont-Saint-Guibert
>>>>
>>>> (T) +32 10 75 02 00
>>>>
>>>> *euranova.eu <http://euranova.eu/>*
>>>>
>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>
>>>> ♻ Be green, keep it on the screen
>>>
>>>
>> ♻ Be green, keep it on the screen
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

-- 
♻ Be green, keep it on the screen

Reply via email to