Hi,

AFAIK this should be supported in 1.12 via FLINK-19568 [1]
I'm pulling in Timo and Jark who might know better.

https://issues.apache.org/jira/browse/FLINK-19857

Regards,
Roman


On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com> wrote:

> Any help please? Is there a way to use the "Last row" from a deduplication
> in an append-only stream or tell upsert-kafka to not produce *null*
> records in the sink?
>
> Thank you
>
> On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com> wrote:
>
>> Hello,
>> Flink 1.12.1(pyflink)
>> I am deduplicating CDC records coming from Maxwell in a kafka topic.
>> Here is the SQL:
>>
>> CREATE TABLE stats_topic(
>>>           `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>>>           `ts` BIGINT,
>>>           `xid` BIGINT ,
>>>           row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>>           WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>>>         ) WITH (
>>>           'connector' = 'kafka',
>>>           'format' = 'json',
>>>           'topic' = 'stats_topic',
>>>           'properties.bootstrap.servers' = 'localhost:9092',
>>>           'properties.group.id' = 'test_group'
>>>         )
>>>
>>> CREATE TABLE sink_table(
>>>           `id` BIGINT,
>>>           `account` INT,
>>>           `upd_ts` BIGINT
>>>         ) WITH (
>>>           'connector' = 'kafka',
>>>           'format' = 'json',
>>>           'topic' = 'sink_topic',
>>>           'properties.bootstrap.servers' = 'localhost:9092',
>>>           'properties.group.id' = 'test_group'
>>>         )
>>>
>>>
>>> INSERT INTO sink_table
>>> SELECT
>>> id,
>>> account,
>>> upd_ts
>>> FROM (
>>> SELECT
>>>  id,
>>>  account,
>>>  upd_ts,
>>>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
>>> FROM stats_topic
>>> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>>> )
>>> WHERE rownum=1
>>>
>>
>>  As there are a lot of CDC records for a single ID im using ROW_NUMBER()
>> and produce them on a 20 minutes interval to the sink_topic. The problem is
>> that flink doesnt allow me to use it in combination with with the kafka
>> connector:
>>
>>> pyflink.util.exceptions.TableException: Table sink
>>> 'default_catalog.default_database.sink_table' doesn't support consuming
>>> update and delete changes which is produced by node
>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
>>> select=[$f0, $f1, $f2])
>>>
>>
>> If I use the* upsert-kafka* connector everything is fine but then i
>> receive empty JSON records in the sink topic:
>>
>>> {"id": 111111, "account": 4, "upd_ts": 1612334952}
>>> {"id": 222222, "account": 4, "upd_ts": 1612334953}
>>> {}
>>> {"id": 333333, "account": 4, "upd_ts": 1612334955}
>>> {}
>>> {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>>
>>
>> Thank you!
>>
>

Reply via email to