The query which I'm testing now(trying to avoid the deduplication query because of tombstones) is *almost* correct but there are two questions which I can find an answer to: 1. Some of the *id*'s are just stopping to be produced. 2. Does the Tuble window select only the records whose upd_ts is new or the query will always produce all the records in the dynamic table *stats_topic* with the max(upd_ts)?
CREATE TABLE stats_topic( `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>, `ts` BIGINT, `xid` BIGINT , row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)), WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL '15' SECOND ) WITH ( 'connector' = 'kafka', 'format' = 'json', 'topic' = 'stats_topic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'test_group' ) INSERT INTO sink_table SELECT distinct id, account, upd_ts FROM stats_topic t, ( SELECT id, max(upd_ts) as maxTs, FROM stats_topic GROUP BY id, TUMBLE(row_ts, INTERVAL '20' MINUTE) ) s WHERE t.id = s.id AND t.upd_ts = s.maxTs Thank you! On Thu, Feb 11, 2021 at 1:36 PM meneldor <menel...@gmail.com> wrote: > Are you sure that the null records are not actually tombstone records? If >> you use upsert tables you usually want to have them + compaction. Or how >> else will you deal with deletions? > > yes they are tombstone records, but i cannot avoid them because the > deduplication query cant produce an append-only connector on a LastRow. > > What do you want to achieve? CDC records should be deduplicated by >> definition. >> I'm assuming that you want to aggregate the state to the current state. >> If so, how do you decide when the record is complete (e.g. no future >> updates) and can be written? >> I have the feeling that you are using CDC at a place where you don't want >> to use it, so maybe it helps to first explain your use case. Is stream >> processing a good fit for you in the first place? > > Yes, I want to aggregate the state to the current state. The problem is > that the records are gonna be merged in a database by an ETL every hour. So > i don't need all the updates but only the last one, thats why im using a > window function and the future updates will be evaluated by the MERGE query > in the ETL too. > > I've changed the query to instead use max(upd_ts) which is producing to > append only stream and it works but im not 100% sure if the result is the > same: > >> INSERT INTO sink_table >> SELECT distinct id, account, upd_ts >> FROM stats_topic t, ( >> SELECT id, account, max(upd_ts) as maxTs, >> FROM stats_topic >> GROUP BY id, account, TUMBLE(row_ts, INTERVAL '20' MINUTE) >> ) s >> WHERE t.id = s.id AND t.upd_ts = s.maxTs AND t.account = s.account > > > Thanks! > > On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi, >> >> Are you sure that the null records are not actually tombstone records? If >> you use upsert tables you usually want to have them + compaction. Or how >> else will you deal with deletions? >> >> Is there anyone who is successfully deduplicating CDC records into either >>> kafka topic or S3 files(CSV/parquet) ? >>> >> What do you want to achieve? CDC records should be deduplicated by >> definition. >> I'm assuming that you want to aggregate the state to the current state. >> If so, how do you decide when the record is complete (e.g. no future >> updates) and can be written? >> >> I have the feeling that you are using CDC at a place where you don't want >> to use it, so maybe it helps to first explain your use case. Is stream >> processing a good fit for you in the first place? >> >> On Tue, Feb 9, 2021 at 10:37 AM meneldor <menel...@gmail.com> wrote: >> >>> Unfortunately using row_ts doesn't help. Setting the kafka >>> topic cleanup.policy to compact is not a very good idea as it increases >>> cpu, memory and might lead to other problems. >>> So for now I'll just ignore the null records. Is there anyone who is >>> successfully deduplicating CDC records into either kafka topic or S3 >>> files(CSV/parquet) ? >>> >>> Thanks! >>> >>> On Mon, Feb 8, 2021 at 7:13 PM meneldor <menel...@gmail.com> wrote: >>> >>>> Thanks for the quick reply, Timo. Ill test with the row_ts and >>>> compaction mode suggestions. However, ive read somewhere in the archives >>>> that the append only stream is only possible if i extract "the first" >>>> record from the ranking only which in my case is the oldest record. >>>> >>>> Regards >>>> >>>> On Mon, Feb 8, 2021, 18:56 Timo Walther <twal...@apache.org> wrote: >>>> >>>>> Hi, >>>>> >>>>> could the problem be that you are mixing OVER and TUMBLE window with >>>>> each other? The TUMBLE is correctly defined over time attribute >>>>> `row_ts` >>>>> but the OVER window is defined using a regular column `upd_ts`. This >>>>> might be the case why the query is not append-only but updating. >>>>> >>>>> Maybe you can split the problem into sub queries and share the plan >>>>> with >>>>> us using .explain()? >>>>> >>>>> The nulls in upsert-kafka should be gone once you enable compaction >>>>> mode >>>>> in Kafka. >>>>> >>>>> I hope this helps. >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> >>>>> On 08.02.21 10:53, Khachatryan Roman wrote: >>>>> > Hi, >>>>> > >>>>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1] >>>>> > I'm pulling in Timo and Jark who might know better. >>>>> > >>>>> > https://issues.apache.org/jira/browse/FLINK-19857 >>>>> > <https://issues.apache.org/jira/browse/FLINK-19857> >>>>> > >>>>> > Regards, >>>>> > Roman >>>>> > >>>>> > >>>>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com >>>>> > <mailto:menel...@gmail.com>> wrote: >>>>> > >>>>> > Any help please? Is there a way to use the "Last row" from a >>>>> > deduplication in an append-only stream or tell upsert-kafka to >>>>> not >>>>> > produce *null* records in the sink? >>>>> > >>>>> > Thank you >>>>> > >>>>> > On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com >>>>> > <mailto:menel...@gmail.com>> wrote: >>>>> > >>>>> > Hello, >>>>> > Flink 1.12.1(pyflink) >>>>> > I am deduplicating CDC records coming from Maxwell in a kafka >>>>> > topic. Here is the SQL: >>>>> > >>>>> > CREATE TABLE stats_topic( >>>>> > `data` ROW<`id` BIGINT, `account` INT, >>>>> `upd_ts` >>>>> > BIGINT>, >>>>> > `ts` BIGINT, >>>>> > `xid` BIGINT , >>>>> > row_ts AS >>>>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)), >>>>> > WATERMARK FOR `row_ts` AS `row_ts` - >>>>> INTERVAL >>>>> > '15' SECOND >>>>> > ) WITH ( >>>>> > 'connector' = 'kafka', >>>>> > 'format' = 'json', >>>>> > 'topic' = 'stats_topic', >>>>> > 'properties.bootstrap.servers' = >>>>> 'localhost:9092', >>>>> > 'properties.group.id >>>>> > <http://properties.group.id>' = 'test_group' >>>>> > ) >>>>> > >>>>> > CREATE TABLE sink_table( >>>>> > `id` BIGINT, >>>>> > `account` INT, >>>>> > `upd_ts` BIGINT >>>>> > ) WITH ( >>>>> > 'connector' = 'kafka', >>>>> > 'format' = 'json', >>>>> > 'topic' = 'sink_topic', >>>>> > 'properties.bootstrap.servers' = >>>>> 'localhost:9092', >>>>> > 'properties.group.id >>>>> > <http://properties.group.id>' = 'test_group' >>>>> > ) >>>>> > >>>>> > >>>>> > INSERT INTO sink_table >>>>> > SELECT >>>>> > id, >>>>> > account, >>>>> > upd_ts >>>>> > FROM ( >>>>> > SELECT >>>>> > id, >>>>> > account, >>>>> > upd_ts, >>>>> > ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts >>>>> desc) >>>>> > AS rownum >>>>> > FROM stats_topic >>>>> > GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL >>>>> '20' >>>>> > MINUTE) >>>>> > ) >>>>> > WHERE rownum=1 >>>>> > >>>>> > >>>>> > As there are a lot of CDC records for a single ID im using >>>>> > ROW_NUMBER() and produce them on a 20 minutes interval to the >>>>> > sink_topic. The problem is that flink doesnt allow me to use >>>>> it >>>>> > in combination with with the kafka connector: >>>>> > >>>>> > pyflink.util.exceptions.TableException: Table sink >>>>> > 'default_catalog.default_database.sink_table' doesn't >>>>> > support consuming update and delete changes which is >>>>> > produced by node Rank(strategy=[UndefinedStrategy], >>>>> > rankType=[ROW_NUMBER], rankRange=[rankStart=1, >>>>> rankEnd=1], >>>>> > partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, >>>>> $f2]) >>>>> > >>>>> > >>>>> > If I use the*upsert-kafka* connector everything is fine but >>>>> then >>>>> > i receive empty JSON records in the sink topic: >>>>> > >>>>> > {"id": 111111, "account": 4, "upd_ts": 1612334952} >>>>> > {"id": 222222, "account": 4, "upd_ts": 1612334953} >>>>> > {} >>>>> > {"id": 333333, "account": 4, "upd_ts": 1612334955} >>>>> > {} >>>>> > {"id": 444444, "account": 4, "upd_ts": 1612334956} >>>>> > >>>>> > >>>>> > Thank you! >>>>> > >>>>> >>>>>