[
https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924386#comment-15924386
]
ASF GitHub Bot commented on FLINK-5713:
---------------------------------------
Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/3535#discussion_r105931848
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
---
@@ -354,22 +354,27 @@ public void merge(W mergeResult,
}
});
- // drop if the window is already late
- if (isLate(actualWindow)) {
-
mergingWindows.retireWindow(actualWindow);
- continue;
- }
+ context.key = key;
+ context.window = actualWindow;
W stateWindow =
mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window
" + window + " is not in in-flight window set.");
}
windowState.setCurrentNamespace(stateWindow);
- windowState.add(element.getValue());
- context.key = key;
- context.window = actualWindow;
+ // Drop if the window is already late. In rare
cases (with a misbehaving
+ // WindowAssigner) it can happen that a window
becomes late that already has
+ // state (contents, state and timers). That's
why we first get the window state
+ // above and then drop everything.
+ if (isLate(actualWindow)) {
+ clearAllState(actualWindow,
windowState, mergingWindows);
+ mergingWindows.persist();
--- End diff --
Why move `mergingWindows.persist()` from `clearAllState` to here, And we
need not do the null check? How about
```
if (mergingWindows != null) {
mergingWindows.persist();
}
```
> Protect against NPE in WindowOperator window cleanup
> ----------------------------------------------------
>
> Key: FLINK-5713
> URL: https://issues.apache.org/jira/browse/FLINK-5713
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Affects Versions: 1.2.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Fix For: 1.2.1
>
>
> Some (misbehaved) WindowAssigners can cause windows to be dropped from the
> merging window set while a cleanup timer is still active. This will trigger a
> NullPointerException when that timer fires.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)