Hi, AFAIK this should be supported in 1.12 via FLINK-19568 [1] I'm pulling in Timo and Jark who might know better.
https://issues.apache.org/jira/browse/FLINK-19857 Regards, Roman On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com> wrote: > Any help please? Is there a way to use the "Last row" from a deduplication > in an append-only stream or tell upsert-kafka to not produce *null* > records in the sink? > > Thank you > > On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com> wrote: > >> Hello, >> Flink 1.12.1(pyflink) >> I am deduplicating CDC records coming from Maxwell in a kafka topic. >> Here is the SQL: >> >> CREATE TABLE stats_topic( >>> `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>, >>> `ts` BIGINT, >>> `xid` BIGINT , >>> row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)), >>> WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL '15' SECOND >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'format' = 'json', >>> 'topic' = 'stats_topic', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'properties.group.id' = 'test_group' >>> ) >>> >>> CREATE TABLE sink_table( >>> `id` BIGINT, >>> `account` INT, >>> `upd_ts` BIGINT >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'format' = 'json', >>> 'topic' = 'sink_topic', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'properties.group.id' = 'test_group' >>> ) >>> >>> >>> INSERT INTO sink_table >>> SELECT >>> id, >>> account, >>> upd_ts >>> FROM ( >>> SELECT >>> id, >>> account, >>> upd_ts, >>> ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum >>> FROM stats_topic >>> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE) >>> ) >>> WHERE rownum=1 >>> >> >> As there are a lot of CDC records for a single ID im using ROW_NUMBER() >> and produce them on a 20 minutes interval to the sink_topic. The problem is >> that flink doesnt allow me to use it in combination with with the kafka >> connector: >> >>> pyflink.util.exceptions.TableException: Table sink >>> 'default_catalog.default_database.sink_table' doesn't support consuming >>> update and delete changes which is produced by node >>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], >>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC], >>> select=[$f0, $f1, $f2]) >>> >> >> If I use the* upsert-kafka* connector everything is fine but then i >> receive empty JSON records in the sink topic: >> >>> {"id": 111111, "account": 4, "upd_ts": 1612334952} >>> {"id": 222222, "account": 4, "upd_ts": 1612334953} >>> {} >>> {"id": 333333, "account": 4, "upd_ts": 1612334955} >>> {} >>> {"id": 444444, "account": 4, "upd_ts": 1612334956} >>> >> >> Thank you! >> >