[ 
https://issues.apache.org/jira/browse/FLINK-29494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rashmin Patel updated FLINK-29494:
----------------------------------
    Description: 
*Issue Summary:*

While doing GroupWindowAggregation on stream produced by `upsert-kafka` 
connector, I am facing an unexpected behaviour, where restoring a job from 
checkpoint/savepoint is causing past windows(wrt last watermark generated by 
previous job run) to fire.

*Detailed Description:* 

My program is written in Flink SQL.

Watermark Strategy: max-bounded-out-of-orderness with periodic generation (with 
default 200ms interval)

Rowtime field: `updated_at_ts` which is monotonically increasing field in 
changelog stream produced by debezium.

Below is the runtime topology of Flink Job

Kafka Source (upsert mode) >>  ChangeLogNormalize >> GroupWindowAggregate >> 
PostgresSink

*Job Logic Context:*
I am reading a cdc-stream from kafka and record schema looks something like 
this:

(pk, loan_acc_no, status, created_at, *updated_at,* __op).

Now I want to count number of distinct loan_acc_no with *hourly* window. So I 
have created watermark on {{updated_at}} field and hence tumbling also on 
{{updated_at}}

*Usual scenario which triggers unexpected late windows:*

Now suppose that for the previous job run, the latest running window 
was{color:#0747a6} {{2022-09-10 08:59:59}}{color} (win_end time) and job had 
processed events till {{{}08:30{}}}.

Now upon restarting a job, suppose I got a first cdc event like (pk1, loan_1, 
"approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
{color:#00875a}{{2022-09-10 08:45:00}}{color}, "u")  say it {*}E1{*}, which is 
not a late event wrt the last watermark generated by source operator in 
previous job run.

Now there is ChangeLogNormalize operator in between kafka source and window 
operator. So, when kafka source forwards this *E1* to ChangeLogNormalize, it 
will emit two records which will be of type -U and +U, and will be passed as 
input to window operator.

 -U (pk1, loan_1, "pending", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
{color:#00875a}{{2022-09-05 00:00:00}}{color}, "u") => previous state of record 
with key `{_}pk1{_}`
+U (pk1, loan_1, "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
{color:#00875a}{{2022-09-10 08:45:00}}{color}, "u") => same as E1

So this -U type of events are causing the problem since their {{updated_at}} 
can be of any timestamp in the past and we are tumbling on this field. As per 
periodic watermarks, during the first watermark interval (i.e 200 ms default), 
no events will be considered late, so these -U events will create their window 
state and upon receiving first high watermark, windows created by these events 
will fire.

  was:
*Issue Summary:*

While doing GroupWindowAggregation on stream produced by `upsert-kafka` 
connector, I am facing an unexpected behaviour, where restoring a job from 
checkpoint/savepoint is causing past windows(wrt last watermark generated by 
previous job run) to fire.

*Detailed Description:* 

My program is written in Flink SQL.

Watermark Strategy: max-bounded-out-of-orderness with periodic generation (with 
default 200ms interval)

Rowtime field: `updated_at_ts` which is monotonically increasing field in 
changelog stream produced by debezium.

Below is the runtime topology of Flink Job

Kafka Source (upsert mode) >>  ChangeLogNormalize >> GroupWindowAggregate >> 
PostgresSink

*Job Logic Context:*
I am reading a cdc-stream from kafka and record schema looks something like 
this:

(pk, loan_acc_no, status, created_at, *updated_at,* __op).

Now I want to count number of distinct loan_acc_no with *hourly* window. So I 
have created watermark on {{updated_at}} field and hence tumbling also on 
{{updated_at}}

*Usual scenario which triggers unexpected late windows:*

Now suppose that for the previous job run, the latest running window 
was{color:#0747a6} {{2022-09-10 08:59:59}}{color} (win_end time) and job had 
processed events till {{{}08:30{}}}.

Now upon restarting a job, suppose I got a first cdc event like (pk1, loan_1, 
"approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
{color:#00875a}{{2022-09-10 08:45:00}}{color}, "u")  say it {*}E1{*}, which is 
not a late event wrt the last watermark generated by source operator in 
previous job run.

Now there is ChangeLogNormalize operator in between kafka source and window 
operator. So, when kafka source forwards this *E1* to ChangeLogNormalize, it 
will emit two records which will be of type -U and +U, and will be passed as 
input to window operator.

 -U (pk1, loan_1, "pending", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
{color:#00875a}{{2022-09-05 00:00:00}}{color}, "u") => previous state of record 
with key `{_}pk1{_}`
+U (pk1, loan_1, "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
{color:#00875a}{{2022-09-10 08:45:00}}{color}, "u") => same as E1

So this -U type of events are causing the problem since their {{updated_at}} 
can be of any timestamp in the past and we are tumbling on this field. As per 
periodic watermarks, during the first watermark interval (i.e 200 ms default), 
no events will be considered late, so these -U events will create their window 
state and upon receiving first high watermark, they will fire.


> ChangeLogNormalize operator causes unexpected firing of past windows after 
> state restoration
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-29494
>                 URL: https://issues.apache.org/jira/browse/FLINK-29494
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / Runtime
>    Affects Versions: 1.14.2
>         Environment: Flink version: 1.14.2
> API: Flink SQL
>            Reporter: Rashmin Patel
>            Priority: Critical
>
> *Issue Summary:*
> While doing GroupWindowAggregation on stream produced by `upsert-kafka` 
> connector, I am facing an unexpected behaviour, where restoring a job from 
> checkpoint/savepoint is causing past windows(wrt last watermark generated by 
> previous job run) to fire.
> *Detailed Description:* 
> My program is written in Flink SQL.
> Watermark Strategy: max-bounded-out-of-orderness with periodic generation 
> (with default 200ms interval)
> Rowtime field: `updated_at_ts` which is monotonically increasing field in 
> changelog stream produced by debezium.
> Below is the runtime topology of Flink Job
> Kafka Source (upsert mode) >>  ChangeLogNormalize >> GroupWindowAggregate >> 
> PostgresSink
> *Job Logic Context:*
> I am reading a cdc-stream from kafka and record schema looks something like 
> this:
> (pk, loan_acc_no, status, created_at, *updated_at,* __op).
> Now I want to count number of distinct loan_acc_no with *hourly* window. So I 
> have created watermark on {{updated_at}} field and hence tumbling also on 
> {{updated_at}}
> *Usual scenario which triggers unexpected late windows:*
> Now suppose that for the previous job run, the latest running window 
> was{color:#0747a6} {{2022-09-10 08:59:59}}{color} (win_end time) and job had 
> processed events till {{{}08:30{}}}.
> Now upon restarting a job, suppose I got a first cdc event like (pk1, loan_1, 
> "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
> {color:#00875a}{{2022-09-10 08:45:00}}{color}, "u")  say it {*}E1{*}, which 
> is not a late event wrt the last watermark generated by source operator in 
> previous job run.
> Now there is ChangeLogNormalize operator in between kafka source and window 
> operator. So, when kafka source forwards this *E1* to ChangeLogNormalize, it 
> will emit two records which will be of type -U and +U, and will be passed as 
> input to window operator.
>  -U (pk1, loan_1, "pending", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
> {color:#00875a}{{2022-09-05 00:00:00}}{color}, "u") => previous state of 
> record with key `{_}pk1{_}`
> +U (pk1, loan_1, "approved", {color:#00875a}{{2022-09-02 00:00:00}}{color}, 
> {color:#00875a}{{2022-09-10 08:45:00}}{color}, "u") => same as E1
> So this -U type of events are causing the problem since their {{updated_at}} 
> can be of any timestamp in the past and we are tumbling on this field. As per 
> periodic watermarks, during the first watermark interval (i.e 200 ms 
> default), no events will be considered late, so these -U events will create 
> their window state and upon receiving first high watermark, windows created 
> by these events will fire.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to