Hello,

I'm getting an error  in Flink SQL when reading from kafka, deduplicating
records and sending them back to Kafka.

The behavior I want is the following:

*input:*
| client_number | address |
| ------------------- | ----------- |
| 1                      | addr1     |
| 1                      | addr1     |
| 1                      | addr2     |
| 1                      | addr2     |
| 1                      | addr1     |
| 1                      | addr1     |

*output:*
| client_number | address |
| ------------------- | ----------- |
| 1                      | addr1     |
| 1                      | addr2     |
| 1                      | addr1     |

The error seems to say that the type of stream created by the deduplication
query is of "update & delete" type, while kafka only supports append-only:

Unsupported query
Table sink 'vvp.default.sat_customers_address' doesn't support consuming
update and delete changes which is produced by node
Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
$2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])


--> Is there a way to create an append only query from this kind of
deduplication query (see my code here below)?
--> Would that work if I would use, say, a Postgres sink?

Bonus question: can we extract the Kafka ingestion date using Flink SQL?
(here I generated a processing date to allow ordering during deduplication)

P.S.: I'm on the Ververica Platform, but I guess this error is linked to
Flink SQL itself.

Thanks in advance for your help.

Best Regards,

Laurent.

-----------------------------------
-- Read from customers kafka topic
-----------------------------------
CREATE TEMPORARY TABLE customers (
`client_number` INT,
`name` VARCHAR(100),
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'customers',
'csv.field-delimiter' = ';',
'scan.startup.mode' = 'earliest-offset'
);



-----------------------------------
-- Add metadata
-----------------------------------
CREATE TEMPORARY VIEW metadata AS
SELECT *
, sha256(cast(client_number as STRING)) AS customer_pk
, current_timestamp AS load_date
, 'Kafka topic: customers' AS record_source
FROM customers;



-----------------------------------
-- Deduplicate addresses
-----------------------------------
CREATE TEMPORARY VIEW dedup_address as
SELECT customer_pk
, client_number
, load_date
, address
FROM (
SELECT customer_pk
, client_number
, load_date
, record_source
, address
, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER
BY load_date ASC) AS rownum
FROM metadata
) where rownum = 1;






-----------------------------------
-- Send to sat_customers_address kafka topic
-----------------------------------
CREATE TEMPORARY TABLE sat_customers_address (
`customer_pk` VARCHAR(64),
`client_number` INT,
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' =
'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'sat_customers_address'
);

INSERT INTO sat_customers_address
SELECT customer_pk
, client_number
, address
FROM dedup_address;




-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

-- 
♻ Be green, keep it on the screen

Reply via email to