Hi Laurent, 1. Currently, it's impossible to convert deduplicate with last row into an append-only stream. 2. Yes, I think Ververica platform doesn't support 'changelog-json' format natively.
However, regarding your case, I think you can use keep first row on client_number+address key. SELECT * FROM ( SELECT client_number, address, load_date ROW_NUMBER() OVER (PARTITION BY client_number, address ORDER BY proctime() ASC) AS rownum FROM src) WHERE rownum = 1 That means, the duplicate records on the same client_number + address will be ignored, but the new value of address will be emitted as an append-only stream. Hope this helps you. Best, Jark On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens <laurent.exste...@euranova.eu> wrote: > Hi Jark, > > thanks again for your quick response! > > I tried multiple variants of my query by: > - specifying only the primary key in the PARTITION BY clause > - changing the order to DESC to keep the last row > > --> I unfortunately always get the same error message. > If I try to make a simple select on the result of this query, I also get > the following error: The submitted query is not an append-only query. > Only queries producing exclusively new rows over time are supported at the > moment. So whatever change I make, I never get an append-only query --> > Is there something I missed? > > I also tried to write to kafka as changelog-json, but I got the answer: The > sink connector for table `vvp`.`default`.`sat_customers_address` could not > be created. 'changelog-json' is not a supported sink format. Supported sink > formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet']. > (maybe because I'm on the Ververica platform?) > This also seem to require an extra kafka topic then, so not ideal. > > > *I'm starting to wonder if the deduplication query is really what I need.* > > What I need is: > - to forward only the records where some columns (ideally configurable) > change for a specific primary key. > - in realtime (no windowing) > - and have as a result an append-only stream. > > Like this: > > *input:* > *output* (this is what should ultimatelly be > published to Kafka and later inserted in a RDBMS): > | client_number | address | load_date | | client_number > | address | load_date | > | ------------------- | ----------- | -------------- | | > ------------------- | ----------- | -------------- | > | 1 | addr1 | ts1 | --> | > 1 | addr1 | ts1 | > | 1 | addr1 | ts2 | > | 1 | addr2 | ts3 | --> | > 1 | addr2 | ts3 | > | 1 | addr2 | ts4 | > | 1 | addr1 | ts5 | --> | > 1 | addr1 | ts5 | > | 1 | addr1 | ts6 | > > > --> is this deduplication query the right fit therefore? > - if yes, how should it be written to generate an append-only stream? > - If not, are there other options? (Match Recognize, UDF, ....?) > > Thanks a lot for your much appreciated help :). > > Best Regards, > > Laurent. > > > On Thu, 12 Nov 2020 at 07:26, Jark Wu <imj...@gmail.com> wrote: > >> Hi Laurent, >> >> > What I want is a record to be forwarded only if some of the columns >> change >> >> IIUC, what you want is still deduplication with the last row. >> Keeping first row will drop all the duplicate rows on the same primary >> key. >> Keeping last row will emit updates when the duplicate rows on the same >> primary key, that means column value changes will notify downstream >> operators. >> The difference of keeping first row and last row is specified by the >> direction of ORDER BY clause [1]. >> >> Best, >> Jark >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >> >> >> >> >> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens < >> laurent.exste...@euranova.eu> wrote: >> >>> Thanks. >>> >>> I actually want the first row. What I want is a record to be forwarded >>> only if some of the columns change (of course keyed by the primary key). I >>> used rownum = 1, is that not selecting the first row? >>> >>> How can I adapt my query to let only the row effectively changing the >>> values pass, as an append only stream? >>> >>> If not possible, I'll look at converting it after. But I prefer a >>> solution in the deduplication query. >>> The goal is to show my customer that what they want to achieve is very >>> straightforward in flink SQL, so the simpler the queries the better. I need >>> to present my conclusions tomorrow. >>> >>> Thanks a lot already for your help! >>> >>> Best regards, >>> >>> Laurent. >>> >>> >>> On Thu, Nov 12, 2020, 03:43 Jark Wu <imj...@gmail.com> wrote: >>> >>>> Hi Laurent, >>>> >>>> 1. Deduplicate with keeping the first row will generate an append-only >>>> stream. But I guess you are expecting to keep the last row which generates >>>> an updating stream. An alternative way is you can >>>> use the "changelog-json" format in this repo [1], it will convert the >>>> updating stream into append >>>> records with change flag encoded. >>>> 2. Yes. It will replace records with the same key, i.e. upsert >>>> statement. >>>> 3. I think it will be available in one or two months. There will be a >>>> first release candidate soon. >>>> You can watch on the dev ML. I'm not sure the plan of Ververica >>>> platform, cc @Konstantin Knauf <konstan...@ververica.com> >>>> >>>> Best, >>>> Jark >>>> >>>> [1]: >>>> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format >>>> >>>> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens < >>>> laurent.exste...@euranova.eu> wrote: >>>> >>>>> Hi Jark, >>>>> >>>>> thanks for your quick reply. I was indeed expecting it. >>>>> >>>>> But that triggers the following questions: >>>>> >>>>> 1. Is there another way to do this deduplication and generate an >>>>> append-only stream? Match Recognize? UDF? ...? >>>>> 2. If I would put Postgres as a sink, what would happen? Will the >>>>> events happen or will they replace the record with the same key? >>>>> 3. When will release-1.12 be available? And when would it be >>>>> integrated in the Ververica platform? >>>>> >>>>> Thanks a lot for your help! >>>>> >>>>> Best Regards, >>>>> >>>>> Laurent. >>>>> >>>>> >>>>> >>>>> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote: >>>>> >>>>>> Hi Laurent, >>>>>> >>>>>> This is because the deduplicate node generates an updating stream, >>>>>> however Kafka currently only supports append-only stream. >>>>>> This can be addressed in release-1.12, because we introduce a new >>>>>> connector "upsert-kafka" which supports writing updating >>>>>> streams into Kafka compacted topics. >>>>>> >>>>>> Does the "Kafka ingestion date" refer to "kafka message timestamp", >>>>>> i.e. ConsumerRecord#timestamp()? >>>>>> If yes, this is also supported in release-1.12 via metadata syntax in >>>>>> DDL [1]: >>>>>> >>>>>> CREATE TABLE kafka_table ( >>>>>> id BIGINT, >>>>>> name STRING, >>>>>> timestamp BIGINT METADATA, -- read timestamp >>>>>> ) WITH ( >>>>>> 'connector' = 'kafka', >>>>>> 'topic' = 'test-topic', >>>>>> 'format' = 'avro' >>>>>> ) >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> [1]: >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors >>>>>> >>>>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens < >>>>>> laurent.exste...@euranova.eu> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I'm getting an error in Flink SQL when reading from kafka, >>>>>>> deduplicating records and sending them back to Kafka. >>>>>>> >>>>>>> The behavior I want is the following: >>>>>>> >>>>>>> *input:* >>>>>>> | client_number | address | >>>>>>> | ------------------- | ----------- | >>>>>>> | 1 | addr1 | >>>>>>> | 1 | addr1 | >>>>>>> | 1 | addr2 | >>>>>>> | 1 | addr2 | >>>>>>> | 1 | addr1 | >>>>>>> | 1 | addr1 | >>>>>>> >>>>>>> *output:* >>>>>>> | client_number | address | >>>>>>> | ------------------- | ----------- | >>>>>>> | 1 | addr1 | >>>>>>> | 1 | addr2 | >>>>>>> | 1 | addr1 | >>>>>>> >>>>>>> The error seems to say that the type of stream created by the >>>>>>> deduplication query is of "update & delete" type, while kafka only >>>>>>> supports >>>>>>> append-only: >>>>>>> >>>>>>> Unsupported query >>>>>>> Table sink 'vvp.default.sat_customers_address' doesn't support >>>>>>> consuming update and delete changes which is produced by node >>>>>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], >>>>>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, >>>>>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) >>>>>>> >>>>>>> >>>>>>> --> Is there a way to create an append only query from this kind of >>>>>>> deduplication query (see my code here below)? >>>>>>> --> Would that work if I would use, say, a Postgres sink? >>>>>>> >>>>>>> Bonus question: can we extract the Kafka ingestion date using Flink >>>>>>> SQL? (here I generated a processing date to allow ordering during >>>>>>> deduplication) >>>>>>> >>>>>>> P.S.: I'm on the Ververica Platform, but I guess this error is >>>>>>> linked to Flink SQL itself. >>>>>>> >>>>>>> Thanks in advance for your help. >>>>>>> >>>>>>> Best Regards, >>>>>>> >>>>>>> Laurent. >>>>>>> >>>>>>> ----------------------------------- >>>>>>> -- Read from customers kafka topic >>>>>>> ----------------------------------- >>>>>>> CREATE TEMPORARY TABLE customers ( >>>>>>> `client_number` INT, >>>>>>> `name` VARCHAR(100), >>>>>>> `address` VARCHAR(100) >>>>>>> ) >>>>>>> COMMENT '' >>>>>>> WITH ( >>>>>>> 'connector' = 'kafka', >>>>>>> 'format' = 'csv', >>>>>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092', >>>>>>> 'properties.group.id' = 'flinkSQL', >>>>>>> 'topic' = 'customers', >>>>>>> 'csv.field-delimiter' = ';', >>>>>>> 'scan.startup.mode' = 'earliest-offset' >>>>>>> ); >>>>>>> >>>>>>> >>>>>>> >>>>>>> ----------------------------------- >>>>>>> -- Add metadata >>>>>>> ----------------------------------- >>>>>>> CREATE TEMPORARY VIEW metadata AS >>>>>>> SELECT * >>>>>>> , sha256(cast(client_number as STRING)) AS customer_pk >>>>>>> , current_timestamp AS load_date >>>>>>> , 'Kafka topic: customers' AS record_source >>>>>>> FROM customers; >>>>>>> >>>>>>> >>>>>>> >>>>>>> ----------------------------------- >>>>>>> -- Deduplicate addresses >>>>>>> ----------------------------------- >>>>>>> CREATE TEMPORARY VIEW dedup_address as >>>>>>> SELECT customer_pk >>>>>>> , client_number >>>>>>> , load_date >>>>>>> , address >>>>>>> FROM ( >>>>>>> SELECT customer_pk >>>>>>> , client_number >>>>>>> , load_date >>>>>>> , record_source >>>>>>> , address >>>>>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, >>>>>>> address ORDER BY load_date ASC) AS rownum >>>>>>> FROM metadata >>>>>>> ) where rownum = 1; >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ----------------------------------- >>>>>>> -- Send to sat_customers_address kafka topic >>>>>>> ----------------------------------- >>>>>>> CREATE TEMPORARY TABLE sat_customers_address ( >>>>>>> `customer_pk` VARCHAR(64), >>>>>>> `client_number` INT, >>>>>>> `address` VARCHAR(100) >>>>>>> ) >>>>>>> COMMENT '' >>>>>>> WITH ( >>>>>>> 'connector' = 'kafka', >>>>>>> 'format' = 'csv', >>>>>>> 'properties.bootstrap.servers' = >>>>>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', >>>>>>> 'properties.group.id' = 'flinkSQL', >>>>>>> 'topic' = 'sat_customers_address' >>>>>>> ); >>>>>>> >>>>>>> INSERT INTO sat_customers_address >>>>>>> SELECT customer_pk >>>>>>> , client_number >>>>>>> , address >>>>>>> FROM dedup_address; >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> *Laurent Exsteens* >>>>>>> Data Engineer >>>>>>> (M) +32 (0) 486 20 48 36 >>>>>>> >>>>>>> *EURA NOVA* >>>>>>> >>>>>>> Rue Emile Francqui, 4 >>>>>>> >>>>>>> 1435 Mont-Saint-Guibert >>>>>>> >>>>>>> (T) +32 10 75 02 00 >>>>>>> >>>>>>> *euranova.eu <http://euranova.eu/>* >>>>>>> >>>>>>> *research.euranova.eu* <http://research.euranova.eu/> >>>>>>> >>>>>>> ♻ Be green, keep it on the screen >>>>>> >>>>>> >>>>> >>>>> -- >>>>> *Laurent Exsteens* >>>>> Data Engineer >>>>> (M) +32 (0) 486 20 48 36 >>>>> >>>>> *EURA NOVA* >>>>> >>>>> Rue Emile Francqui, 4 >>>>> >>>>> 1435 Mont-Saint-Guibert >>>>> >>>>> (T) +32 10 75 02 00 >>>>> >>>>> *euranova.eu <http://euranova.eu/>* >>>>> >>>>> *research.euranova.eu* <http://research.euranova.eu/> >>>>> >>>>> ♻ Be green, keep it on the screen >>>> >>>> >>> ♻ Be green, keep it on the screen >> >> > > -- > *Laurent Exsteens* > Data Engineer > (M) +32 (0) 486 20 48 36 > > *EURA NOVA* > > Rue Emile Francqui, 4 > > 1435 Mont-Saint-Guibert > > (T) +32 10 75 02 00 > > *euranova.eu <http://euranova.eu/>* > > *research.euranova.eu* <http://research.euranova.eu/> > > ♻ Be green, keep it on the screen