[ 
https://issues.apache.org/jira/browse/FLINK-39932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088848#comment-18088848
 ] 

Sameer Devgan commented on FLINK-39932:
---------------------------------------

I have identified the fix and would like to be assigned this ticket to open a PR

> [flink-datastream] Merging-Window onRecord Callback Receives Wrong Window 
> Context in DataStream V2
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39932
>                 URL: https://issues.apache.org/jira/browse/FLINK-39932
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 2.1.3, 2.1.4
>            Reporter: Sameer Devgan
>            Priority: Major
>
> {{{}In OneInputWindowProcessOperator.processElement{}}}, after 
> {{MergingWindowSet.addWindow()}} computes the post-merge result 
> ({{{}actualWindow{}}}), two lines in the merging branch still reference the 
> original pre-merge candidate window ({{{}window{}}}) instead of 
> {{actualWindow.}}
> {code:java}
> outputCollector.setTimestamp(window.maxTimestamp());{code}
> {code:java}
> windowFunctionContext.setWindow(window);{code}
> {{while the trigger context has the correct refrence of actual window ( or 
> the merged window) , windowFunctionContext}}{{ and }}{{outputCollector}}{{ 
> are set from }}{{window}} (bug ripples to here )
> {code:java}
> triggerContext.setWindow(actualWindow);{code}
> {{*Impact*}}
> {{When two session windows merge, the WindowContext}} exposed to the user 
> function reports the stale window's bounds, not the merged window bounds so 
> any logic inside {{onRecord}} that reads {{windowContext.getStartTime()}} or 
> {{getEndTime()}} operates on stale pre-merge data.
> Also ,
> {code:java}
> outputCollector.setTimestamp(window.maxTimestamp()){code}
> {{assigns the candidate window's max-timestamp to records emitted during 
> }}{{{}onRecord{}}}{{{}. for a late record all emitted records carry an 
> incorrectly early timestamp of the window and not the merged window.{}}}
> {{Reproduce }}
> {{1) Create a session window with with a inactivity gap of 10s }}
> {{{}2) Process two records at R1:timestamp 0s and R2:timestanp 
> 5s{}}}windowFunctionContext.setWindow(window) uses the candidate
> [5_000, 15_000) instead of actualWindow [0, 15_000), so getStartTime() 
> returns 5_000 instead of 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to