[ https://issues.apache.org/jira/browse/FLINK-29494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rashmin Patel updated FLINK-29494: ---------------------------------- Component/s: Runtime / Checkpointing > ChangeLogNormalize operator causes unexpected firing of past windows after > state restoration > -------------------------------------------------------------------------------------------- > > Key: FLINK-29494 > URL: https://issues.apache.org/jira/browse/FLINK-29494 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Runtime / Checkpointing, Table SQL / > Runtime > Affects Versions: 1.14.2 > Environment: Flink version: 1.14.2 > API: Flink SQL > Reporter: Rashmin Patel > Priority: Critical > > *Issue Summary:* > While doing GroupWindowAggregation on stream produced by `upsert-kafka` > connector, I am facing an unexpected behaviour, where restoring a job from > checkpoint/savepoint is causing past windows(wrt last watermark generated by > previous job run) to fire. > *Detailed Description:* > My program is written in Flink SQL. > Watermark Strategy: max-bounded-out-of-orderness with periodic generation > (with default 200ms interval) > Rowtime field: `updated_at_ts` which is monotonically increasing field in > changelog stream produced by debezium. > Below is the runtime topology of Flink Job > Kafka Source (upsert mode) >> ChangeLogNormalize >> GroupWindowAggregate >> > PostgresSink > *Job Logic Context:* > I am reading a cdc-stream from kafka and record schema looks something like > this: > (pk, loan_acc_no, status, created_at, *updated_at,* __op). > Now I want to count number of distinct loan_acc_no with *hourly* window. So I > have created watermark on {{updated_at}} field and hence tumbling also on > {{updated_at}} > *Usual scenario which triggers unexpected late windows:* > Now suppose that for the previous job run, the latest running window > was{color:#0747a6} {{2022-09-10 08:59:59}}{color} (win_end time) and job had > processed events till {{{}08:30{}}}. > Now upon restarting a job, suppose I got a first cdc event like (pk1, loan_1, > "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, > {color:#00875a}{{2022-09-10 08:45:00}}{color}, "u") say it {*}E1{*}, which > is not a late event wrt the last watermark generated by source operator in > previous job run. > Now there is ChangeLogNormalize operator in between kafka source and window > operator. So, when kafka source forwards this *E1* to ChangeLogNormalize, it > will emit two records which will be of type -U and +U, and will be passed as > input to window operator. > -U (pk1, loan_1, "pending", {color:#00875a}{{2022-09-02 00:00:00}}{color}, > {color:#00875a}{{2022-09-05 00:00:00}}{color}, "u") => previous state of > record with key `{_}pk1{_}` > +U (pk1, loan_1, "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, > {color:#00875a}{{2022-09-10 08:45:00}}{color}, "u") => same as E1 > So this -U type of events are causing the problem since their {{updated_at}} > can be of any timestamp in the past and we are tumbling on this field. As per > periodic watermarks, during the first watermark interval (i.e 200 ms > default), no events will be considered late, so these -U events will create > their window state and upon receiving first high watermark, windows created > by these events will fire. -- This message was sent by Atlassian Jira (v8.20.10#820010)