[ 
https://issues.apache.org/jira/browse/FLINK-29494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rashmin Patel updated FLINK-29494:
----------------------------------
    Summary: ChangeLogNormalize operator causes undesired firing of past 
windows after state restoration  (was: ChangeLogNormalize operator causes 
undesired firing of past windows after restoration)

> ChangeLogNormalize operator causes undesired firing of past windows after 
> state restoration
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-29494
>                 URL: https://issues.apache.org/jira/browse/FLINK-29494
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / Runtime
>    Affects Versions: 1.14.2
>         Environment: Flink version: 1.14.2
> API: Flink SQL
>            Reporter: Rashmin Patel
>            Priority: Critical
>
> *Issue Summary:*
> While doing GroupWindowAggregation on stream produced by `upsert-kafka` 
> connector, I am facing an unexpected behaviour, where restoring a job from 
> checkpoint/savepoint is causing past windows(wrt last watermark generated by 
> previous job run) to fire.
> *Detailed Description:* 
> My program is written in Flink SQL.
> Watermark Strategy: max-bounded-out-of-orderness with periodic generation 
> (with default 200ms interval)
> Rowtime field: `updated_at_ts` which is monotonically increasing field in 
> changelog stream produced by debezium.
> Below is the runtime topology of Flink Job
> Kafka Source (upsert mode) >>  ChangeLogNormalize >> GroupWindowAggregate >> 
> PostgresSink
> *Job Logic Context:*
> I am reading a cdc-stream from kafka and record schema looks something like 
> this: 
> (pk, loan_acc_no, status, created_at, *updated_at,* __op). 
> Now I want to count number of distinct loan_acc_no with *hourly* window. So I 
> have created watermark on {{updated_at}} field and hence tumbling also on 
> {{updated_at}}
> *Usual scenario which triggers late windows:*
> Now suppose that for the previous job run, the latest running window 
> was{color:#0747a6} {color:#00875a}{{2022-09-10 08:59:59}}{color}{color} 
> (win_end time) and job had processed events till 
> {color:#0747a6}{{{}08:30{}}}.{color} 
> Now upon restarting a job, suppose I got a first cdc event like (pk1, loan_1, 
> "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
> {color:#00875a}{{2022-09-10 08:45:00}}{color}, "u")  say it {*}E1{*}, which 
> is not a late event wrt the last watermark generated by source operator in 
> previous job run.
> Now there is ChangeLogNormalize operator in between kafka source and window 
> operator. So, when kafka source forwards this *E1* to ChangeLogNormalize, it 
> will emit two records which will be of type -U and +U, and will be passed as 
> input to window operator.
>  -U (pk1, loan_1, "pending", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
> {color:#00875a}{{2022-09-05 00:00:00}}{color}, "u") => previous state of 
> record with key `{_}pk1{_}`
> +U (pk1, loan_1, "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
> {color:#00875a}{{2022-09-10 08:45:00}}{color}, "u") => same as E1
> So this -U type of events are causing the problem since their {{updated_at}} 
> can be of any timestamp in the past and we are tumbling on this field. As per 
> periodic watermarks, during the first watermark interval (i.e 200 ms 
> default), no events will be considered late, so these -U events will create 
> their window state and upon receiving first high watermark, they will fire.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to