The query which I'm testing now(trying to avoid the deduplication query
because of tombstones) is *almost* correct but there are two questions
which I can find an answer to:
1. Some of the *id*'s are just stopping to be produced.
2. Does the Tuble window select only the records whose upd_ts is new or the
query will always produce all the records in the dynamic table *stats_topic*
with the max(upd_ts)?

CREATE TABLE stats_topic(
  `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
  `ts` BIGINT,
  `xid` BIGINT ,
  row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
  WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
) WITH (
  'connector' = 'kafka',
  'format' = 'json',
  'topic' = 'stats_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'test_group'
)


INSERT INTO sink_table
SELECT distinct id, account, upd_ts
FROM stats_topic t, (
   SELECT id, max(upd_ts) as maxTs,
   FROM stats_topic
   GROUP BY id, TUMBLE(row_ts, INTERVAL '20' MINUTE)
) s
WHERE  t.id = s.id AND t.upd_ts = s.maxTs


Thank you!

On Thu, Feb 11, 2021 at 1:36 PM meneldor <menel...@gmail.com> wrote:

> Are you sure that the null records are not actually tombstone records? If
>> you use upsert tables you usually want to have them + compaction. Or how
>> else will you deal with deletions?
>
> yes they are tombstone records, but i cannot avoid them because the
> deduplication query cant produce an append-only connector on a LastRow.
>
> What do you want to achieve? CDC records should be deduplicated by
>> definition.
>> I'm assuming that you want to aggregate the state to the current state.
>> If so, how do you decide when the record is complete (e.g. no future
>> updates) and can be written?
>> I have the feeling that you are using CDC at a place where you don't want
>> to use it, so maybe it helps to first explain your use case. Is stream
>> processing a good fit for you in the first place?
>
> Yes, I want to aggregate the state to the current state. The problem is
> that the records are gonna be merged in a database by an ETL every hour. So
> i don't need all the updates but only the last one, thats why im using a
> window function and the future updates will be evaluated by the MERGE query
> in the ETL too.
>
> I've changed the query to instead use max(upd_ts) which is producing to
> append only stream and it works but im not 100% sure if the result is the
> same:
>
>> INSERT INTO sink_table
>> SELECT distinct id, account, upd_ts
>> FROM stats_topic t, (
>>    SELECT id, account, max(upd_ts) as maxTs,
>>    FROM stats_topic
>>    GROUP BY id, account, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>> ) s
>> WHERE  t.id = s.id AND t.upd_ts = s.maxTs AND t.account = s.account
>
>
> Thanks!
>
> On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi,
>>
>> Are you sure that the null records are not actually tombstone records? If
>> you use upsert tables you usually want to have them + compaction. Or how
>> else will you deal with deletions?
>>
>> Is there anyone who is successfully deduplicating CDC records into either
>>> kafka topic or S3 files(CSV/parquet) ?
>>>
>> What do you want to achieve? CDC records should be deduplicated by
>> definition.
>> I'm assuming that you want to aggregate the state to the current state.
>> If so, how do you decide when the record is complete (e.g. no future
>> updates) and can be written?
>>
>> I have the feeling that you are using CDC at a place where you don't want
>> to use it, so maybe it helps to first explain your use case. Is stream
>> processing a good fit for you in the first place?
>>
>> On Tue, Feb 9, 2021 at 10:37 AM meneldor <menel...@gmail.com> wrote:
>>
>>> Unfortunately using row_ts doesn't help. Setting the kafka
>>> topic cleanup.policy to compact is not a very good idea as it increases
>>> cpu, memory and might lead to other problems.
>>> So for now I'll just ignore the null records. Is there anyone who is
>>> successfully deduplicating CDC records into either kafka topic or S3
>>> files(CSV/parquet) ?
>>>
>>> Thanks!
>>>
>>> On Mon, Feb 8, 2021 at 7:13 PM meneldor <menel...@gmail.com> wrote:
>>>
>>>> Thanks for the quick reply, Timo. Ill test with the  row_ts and
>>>> compaction mode suggestions. However, ive read somewhere in the archives
>>>> that the append only stream is only possible if i extract "the first"
>>>> record from the ranking only which in my case is the oldest record.
>>>>
>>>> Regards
>>>>
>>>> On Mon, Feb 8, 2021, 18:56 Timo Walther <twal...@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> could the problem be that you are mixing OVER and TUMBLE window with
>>>>> each other? The TUMBLE is correctly defined over time attribute
>>>>> `row_ts`
>>>>> but the OVER window is defined using a regular column `upd_ts`. This
>>>>> might be the case why the query is not append-only but updating.
>>>>>
>>>>> Maybe you can split the problem into sub queries and share the plan
>>>>> with
>>>>> us using .explain()?
>>>>>
>>>>> The nulls in upsert-kafka should be gone once you enable compaction
>>>>> mode
>>>>> in Kafka.
>>>>>
>>>>> I hope this helps.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 08.02.21 10:53, Khachatryan Roman wrote:
>>>>> > Hi,
>>>>> >
>>>>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
>>>>> > I'm pulling in Timo and Jark who might know better.
>>>>> >
>>>>> > https://issues.apache.org/jira/browse/FLINK-19857
>>>>> > <https://issues.apache.org/jira/browse/FLINK-19857>
>>>>> >
>>>>> > Regards,
>>>>> > Roman
>>>>> >
>>>>> >
>>>>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com
>>>>> > <mailto:menel...@gmail.com>> wrote:
>>>>> >
>>>>> >     Any help please? Is there a way to use the "Last row" from a
>>>>> >     deduplication in an append-only stream or tell upsert-kafka to
>>>>> not
>>>>> >     produce *null* records in the sink?
>>>>> >
>>>>> >     Thank you
>>>>> >
>>>>> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com
>>>>> >     <mailto:menel...@gmail.com>> wrote:
>>>>> >
>>>>> >         Hello,
>>>>> >         Flink 1.12.1(pyflink)
>>>>> >         I am deduplicating CDC records coming from Maxwell in a kafka
>>>>> >         topic.  Here is the SQL:
>>>>> >
>>>>> >             CREATE TABLE stats_topic(
>>>>> >                        `data` ROW<`id` BIGINT, `account` INT,
>>>>> `upd_ts`
>>>>> >             BIGINT>,
>>>>> >                        `ts` BIGINT,
>>>>> >                        `xid` BIGINT ,
>>>>> >                        row_ts AS
>>>>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>>>> >                        WATERMARK FOR `row_ts` AS `row_ts`   -
>>>>> INTERVAL
>>>>> >             '15' SECOND
>>>>> >                      ) WITH (
>>>>> >                        'connector' = 'kafka',
>>>>> >                        'format' = 'json',
>>>>> >                        'topic' = 'stats_topic',
>>>>> >                        'properties.bootstrap.servers' =
>>>>> 'localhost:9092',
>>>>> >                        'properties.group.id
>>>>> >             <http://properties.group.id>' = 'test_group'
>>>>> >                      )
>>>>> >
>>>>> >             CREATE TABLE sink_table(
>>>>> >                        `id` BIGINT,
>>>>> >                        `account` INT,
>>>>> >                        `upd_ts` BIGINT
>>>>> >                      ) WITH (
>>>>> >                        'connector' = 'kafka',
>>>>> >                        'format' = 'json',
>>>>> >                        'topic' = 'sink_topic',
>>>>> >                        'properties.bootstrap.servers' =
>>>>> 'localhost:9092',
>>>>> >                        'properties.group.id
>>>>> >             <http://properties.group.id>' = 'test_group'
>>>>> >                      )
>>>>> >
>>>>> >
>>>>> >             INSERT INTO sink_table
>>>>> >             SELECT
>>>>> >             id,
>>>>> >             account,
>>>>> >             upd_ts
>>>>> >             FROM (
>>>>> >             SELECT
>>>>> >               id,
>>>>> >               account,
>>>>> >               upd_ts,
>>>>> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts
>>>>> desc)
>>>>> >             AS rownum
>>>>> >             FROM stats_topic
>>>>> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL
>>>>> '20'
>>>>> >             MINUTE)
>>>>> >             )
>>>>> >             WHERE rownum=1
>>>>> >
>>>>> >
>>>>> >           As there are a lot of CDC records for a single ID im using
>>>>> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
>>>>> >         sink_topic. The problem is that flink doesnt allow me to use
>>>>> it
>>>>> >         in combination with with the kafka connector:
>>>>> >
>>>>> >             pyflink.util.exceptions.TableException: Table sink
>>>>> >             'default_catalog.default_database.sink_table' doesn't
>>>>> >             support consuming update and delete changes which is
>>>>> >             produced by node Rank(strategy=[UndefinedStrategy],
>>>>> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1,
>>>>> rankEnd=1],
>>>>> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
>>>>> $f2])
>>>>> >
>>>>> >
>>>>> >         If I use the*upsert-kafka* connector everything is fine but
>>>>> then
>>>>> >         i receive empty JSON records in the sink topic:
>>>>> >
>>>>> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>>>>> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>>>>> >             {}
>>>>> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>>>>> >             {}
>>>>> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>>>> >
>>>>> >
>>>>> >         Thank you!
>>>>> >
>>>>>
>>>>>

Reply via email to