Sameer Devgan created FLINK-39932:
-------------------------------------
Summary: [flink-datastream] Merging-Window onRecord Callback
Receives Wrong Window Context in DataStream V2
Key: FLINK-39932
URL: https://issues.apache.org/jira/browse/FLINK-39932
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 2.1.3, 2.1.4
Reporter: Sameer Devgan
{{{}In {{{}OneInputWindowProcessOperator.processElement{}}}, after
{{MergingWindowSet.addWindow()}} computes the post-merge result
({{{}actualWindow{}}}), two lines in the merging branch still reference the
original pre-merge candidate window ({{{}window{}}}) instead of
{{actualWindow.}}{}}}{{{}{{}}{}}}{{{}{{**}}{}}}
{code:java}
outputCollector.setTimestamp(window.maxTimestamp());{code}
{{{{**}}}}
{code:java}
windowFunctionContext.setWindow(window);{code}
{{{}{{**}}{}}}{{{}{{while the trigger context has the correct refrence of
actual window ( or the merged window) , }}{{windowFunctionContext}}{{ and
}}{{outputCollector}}{{ are set from }}{{window}}{{ (bug ripples to here
).}}{}}}
{{{{}}}}
{{{{ ** }}}}
{code:java}
triggerContext.setWindow(actualWindow);{code}
{{{{}}}}
{{{{}}}}
{{*Impact*}}
{{When two session windows merge, the {{WindowContext}} exposed to the user
function reports the stale window's bounds, not the merged window bounds so any
logic inside {{onRecord}} that reads {{windowContext.getStartTime()}} or
{{getEndTime()}} operates on stale pre-merge data.}}
{{{{Also , }}}}
{{{{{}{}}}{{{}{}}}}}
{code:java}
outputCollector.setTimestamp(window.maxTimestamp()){code}
{{{{{}{}}}{{{}assigns the candidate window's max-timestamp to records emitted
during {}}}{{{}onRecord{}}}{{{}. for a late record all emitted records carry an
incorrectly early timestamp of the window and not the merged window. {}}}}}
{{{{}}}}
{{Reproduce }}
{{1) Create a session window with with a inactivity gap of 10s }}
{{2) Process two records at R1:timestamp 0s and R2:timestanp 5s}}
{{}}
windowFunctionContext.setWindow(window) uses the candidate
[5_000, 15_000) instead of actualWindow [0, 15_000), so getStartTime() returns
5_000 instead of 0.
{{{{}}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)