[ 
https://issues.apache.org/jira/browse/FLINK-39932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sameer Devgan updated FLINK-39932:
----------------------------------
    Description: 
{{{}In OneInputWindowProcessOperator.processElement{}}}, after 
{{MergingWindowSet.addWindow()}} computes the post-merge result 
({{{}actualWindow{}}}), two lines in the merging branch still reference the 
original pre-merge candidate window ({{{}window{}}}) instead of 
{{actualWindow.}}
{code:java}
outputCollector.setTimestamp(window.maxTimestamp());{code}
{code:java}
windowFunctionContext.setWindow(window);{code}
{{while the trigger context has the correct refrence of actual window ( or the 
merged window) , windowFunctionContext}}{{ and }}{{outputCollector}}{{ are set 
from }}{{window}} (bug ripples to here )


{code:java}
triggerContext.setWindow(actualWindow);{code}
{{*Impact*}}

{{When two session windows merge, the WindowContext}} exposed to the user 
function reports the stale window's bounds, not the merged window bounds so any 
logic inside {{onRecord}} that reads {{windowContext.getStartTime()}} or 
{{getEndTime()}} operates on stale pre-merge data.
Also ,
{code:java}
outputCollector.setTimestamp(window.maxTimestamp()){code}
{{assigns the candidate window's max-timestamp to records emitted during 
}}{{{}onRecord{}}}{{{}. for a late record all emitted records carry an 
incorrectly early timestamp of the window and not the merged window.{}}}
{{Reproduce }}
{{1) Create a session window with with a inactivity gap of 10s }}
{{{}2) Process two records at R1:timestamp 0s and R2:timestanp 
5s{}}}windowFunctionContext.setWindow(window) uses the candidate
[5_000, 15_000) instead of actualWindow [0, 15_000), so getStartTime() returns 
5_000 instead of 0.

  was:
{{{}In {{{}OneInputWindowProcessOperator.processElement{}}}, after 
{{MergingWindowSet.addWindow()}} computes the post-merge result 
({{{}actualWindow{}}}), two lines in the merging branch still reference the 
original pre-merge candidate window ({{{}window{}}}) instead of 
{{actualWindow.}}{}}}{{{}{{}}{}}}{{{}{{**}}{}}}
{code:java}
outputCollector.setTimestamp(window.maxTimestamp());{code}
{{{{**}}}}
{code:java}
windowFunctionContext.setWindow(window);{code}
{{{}{{**}}{}}}{{{}{{while the trigger context has the correct refrence of 
actual window ( or the merged window) , }}{{windowFunctionContext}}{{ and 
}}{{outputCollector}}{{ are set from }}{{window}}{{ (bug ripples to here 
).}}{}}}
{{{{}}}}
{{{{ ** }}}}
{code:java}
triggerContext.setWindow(actualWindow);{code}
{{{{}}}}
{{{{}}}}

{{*Impact*}}

{{When two session windows merge, the {{WindowContext}} exposed to the user 
function reports the stale window's bounds, not the merged window bounds so any 
logic inside {{onRecord}} that reads {{windowContext.getStartTime()}} or 
{{getEndTime()}} operates on stale pre-merge data.}}

{{{{Also , }}}}
{{{{{}{}}}{{{}{}}}}}
{code:java}
outputCollector.setTimestamp(window.maxTimestamp()){code}
{{{{{}{}}}{{{}assigns the candidate window's max-timestamp to records emitted 
during {}}}{{{}onRecord{}}}{{{}. for a late record all emitted records carry an 
incorrectly early timestamp of the window and not the merged window. {}}}}}
{{{{}}}}
{{Reproduce }}
{{1) Create a session window with with a inactivity gap of 10s }}
{{2) Process two records at R1:timestamp 0s and R2:timestanp 5s}}

{{}}
windowFunctionContext.setWindow(window) uses the candidate
[5_000, 15_000) instead of actualWindow [0, 15_000), so getStartTime() returns 
5_000 instead of 0.
{{{{}}}}


> [flink-datastream] Merging-Window onRecord Callback Receives Wrong Window 
> Context in DataStream V2
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39932
>                 URL: https://issues.apache.org/jira/browse/FLINK-39932
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 2.1.3, 2.1.4
>            Reporter: Sameer Devgan
>            Priority: Major
>
> {{{}In OneInputWindowProcessOperator.processElement{}}}, after 
> {{MergingWindowSet.addWindow()}} computes the post-merge result 
> ({{{}actualWindow{}}}), two lines in the merging branch still reference the 
> original pre-merge candidate window ({{{}window{}}}) instead of 
> {{actualWindow.}}
> {code:java}
> outputCollector.setTimestamp(window.maxTimestamp());{code}
> {code:java}
> windowFunctionContext.setWindow(window);{code}
> {{while the trigger context has the correct refrence of actual window ( or 
> the merged window) , windowFunctionContext}}{{ and }}{{outputCollector}}{{ 
> are set from }}{{window}} (bug ripples to here )
> {code:java}
> triggerContext.setWindow(actualWindow);{code}
> {{*Impact*}}
> {{When two session windows merge, the WindowContext}} exposed to the user 
> function reports the stale window's bounds, not the merged window bounds so 
> any logic inside {{onRecord}} that reads {{windowContext.getStartTime()}} or 
> {{getEndTime()}} operates on stale pre-merge data.
> Also ,
> {code:java}
> outputCollector.setTimestamp(window.maxTimestamp()){code}
> {{assigns the candidate window's max-timestamp to records emitted during 
> }}{{{}onRecord{}}}{{{}. for a late record all emitted records carry an 
> incorrectly early timestamp of the window and not the merged window.{}}}
> {{Reproduce }}
> {{1) Create a session window with with a inactivity gap of 10s }}
> {{{}2) Process two records at R1:timestamp 0s and R2:timestanp 
> 5s{}}}windowFunctionContext.setWindow(window) uses the candidate
> [5_000, 15_000) instead of actualWindow [0, 15_000), so getStartTime() 
> returns 5_000 instead of 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to