This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch release-2.25.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.25.0 by this push:
     new cc0197b  [BEAM-11034] Avoid build-up of stateful garbage collection 
timers for GlobalWindow.
     new 51cd5c0  Merge pull request #13056 from 
scwhittle/no_global_stateful_timers_release
cc0197b is described below

commit cc0197bf9287bb43219669283912278f419b862a
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Oct 7 00:50:01 2020 -0700

    [BEAM-11034] Avoid build-up of stateful garbage collection timers for 
GlobalWindow.
    
    Such cleanup timers are not needed on the GlobalWindow for DoFn without
    @onWindowExpiration. It is a common pattern for users to use the global
    window for stateful processing for full control using state and timers.
    However if this is done for stages with unbounded keyspaces, the gc
    timers build up indefinitely.
---
 .../runners/dataflow/worker/SimpleParDoFn.java     | 24 +++++++++++++++-------
 1 file changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 2ebcf94..10dcf3c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -486,14 +487,23 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
     for (W window : windowsToCleanup) {
       // The stepContext is the thing that know if it is batch or streaming, 
hence
       // whether state needs to be cleaned up or will simply be discarded so 
the
-      // timer can be ignored
-
+      // timer can be ignored.
       Instant cleanupTime = earliestAllowableCleanupTime(window, 
windowingStrategy);
-      // if DoFn has OnWindowExpiration then set holds for system timer.
-      Instant cleanupOutputTimestamp =
-          fnSignature.onWindowExpiration() == null ? cleanupTime : 
cleanupTime.minus(1L);
-      stepContext.setStateCleanupTimer(
-          CLEANUP_TIMER_ID, window, windowCoder, cleanupTime, 
cleanupOutputTimestamp);
+      // Set a cleanup timer for state at the end of the window to trigger 
onWindowExpiration and
+      // garbage collect state. We avoid doing this for the global window if 
there is no window
+      // expiration set as the state will be up when the pipeline terminates. 
Setting the timer
+      // leads to a unbounded growth of timers for pipelines with many unique 
keys in the global
+      // window.
+      if (cleanupTime.isBefore(GlobalWindow.INSTANCE.maxTimestamp())
+          || fnSignature.onWindowExpiration() != null) {
+        // If the DoFn has OnWindowExpiration, then set the watermark hold so 
that the watermark
+        // does
+        // not advance until OnWindowExpiration completes.
+        Instant cleanupOutputTimestamp =
+            fnSignature.onWindowExpiration() == null ? cleanupTime : 
cleanupTime.minus(1L);
+        stepContext.setStateCleanupTimer(
+            CLEANUP_TIMER_ID, window, windowCoder, cleanupTime, 
cleanupOutputTimestamp);
+      }
     }
   }
 

Reply via email to