This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch release-2.25.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.25.0 by this push:
new cc0197b [BEAM-11034] Avoid build-up of stateful garbage collection
timers for GlobalWindow.
new 51cd5c0 Merge pull request #13056 from
scwhittle/no_global_stateful_timers_release
cc0197b is described below
commit cc0197bf9287bb43219669283912278f419b862a
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Oct 7 00:50:01 2020 -0700
[BEAM-11034] Avoid build-up of stateful garbage collection timers for
GlobalWindow.
Such cleanup timers are not needed on the GlobalWindow for DoFn without
@onWindowExpiration. It is a common pattern for users to use the global
window for stateful processing for full control using state and timers.
However if this is done for stages with unbounded keyspaces, the gc
timers build up indefinitely.
---
.../runners/dataflow/worker/SimpleParDoFn.java | 24 +++++++++++++++-------
1 file changed, 17 insertions(+), 7 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 2ebcf94..10dcf3c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.DoFnInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
@@ -486,14 +487,23 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
for (W window : windowsToCleanup) {
// The stepContext is the thing that know if it is batch or streaming,
hence
// whether state needs to be cleaned up or will simply be discarded so
the
- // timer can be ignored
-
+ // timer can be ignored.
Instant cleanupTime = earliestAllowableCleanupTime(window,
windowingStrategy);
- // if DoFn has OnWindowExpiration then set holds for system timer.
- Instant cleanupOutputTimestamp =
- fnSignature.onWindowExpiration() == null ? cleanupTime :
cleanupTime.minus(1L);
- stepContext.setStateCleanupTimer(
- CLEANUP_TIMER_ID, window, windowCoder, cleanupTime,
cleanupOutputTimestamp);
+ // Set a cleanup timer for state at the end of the window to trigger
onWindowExpiration and
+ // garbage collect state. We avoid doing this for the global window if
there is no window
+ // expiration set as the state will be up when the pipeline terminates.
Setting the timer
+ // leads to a unbounded growth of timers for pipelines with many unique
keys in the global
+ // window.
+ if (cleanupTime.isBefore(GlobalWindow.INSTANCE.maxTimestamp())
+ || fnSignature.onWindowExpiration() != null) {
+ // If the DoFn has OnWindowExpiration, then set the watermark hold so
that the watermark
+ // does
+ // not advance until OnWindowExpiration completes.
+ Instant cleanupOutputTimestamp =
+ fnSignature.onWindowExpiration() == null ? cleanupTime :
cleanupTime.minus(1L);
+ stepContext.setStateCleanupTimer(
+ CLEANUP_TIMER_ID, window, windowCoder, cleanupTime,
cleanupOutputTimestamp);
+ }
}
}