Repository: incubator-beam
Updated Branches:
  refs/heads/master 1aee01753 -> 6b408b2a3


Clear finished bits from merged-away windows, not merge result window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99c0fa79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99c0fa79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99c0fa79

Branch: refs/heads/master
Commit: 99c0fa794e4a99121a668fc0becb5ae9ead68dfc
Parents: f2f5129
Author: Mark Shields <markshie...@google.com>
Authored: Mon Apr 4 13:31:12 2016 -0700
Committer: Mark Shields <markshie...@google.com>
Committed: Tue Apr 5 13:11:07 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/util/TriggerRunner.java  | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c0fa79/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
index 7f81a9b..aa7e101 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
@@ -84,6 +84,15 @@ public class TriggerRunner<W extends BoundedWindow> {
             : FinishedTriggersBitSet.fromBitSet(bitSet);
   }
 
+
+  private void clearFinishedBits(ValueState<BitSet> state) {
+    if (!isFinishedSetNeeded()) {
+      // Nothing to clear.
+      return;
+    }
+    state.clear();
+  }
+
   /** Return true if the trigger is closed in the window corresponding to the 
specified state. */
   public boolean isClosed(StateAccessor<?> state) {
     return 
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
@@ -151,6 +160,8 @@ public class TriggerRunner<W extends BoundedWindow> {
         state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
       // Don't need to clone these, since the trigger context doesn't allow 
modification
       builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
+      // Clear the underlying finished bits.
+      clearFinishedBits(entry.getValue());
     }
     ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
 
@@ -161,9 +172,6 @@ public class TriggerRunner<W extends BoundedWindow> {
     rootTrigger.invokeOnMerge(mergeContext);
 
     persistFinishedSet(state, finishedSet);
-
-    // Clear the finished bits.
-    clearFinished(state);
   }
 
   public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) 
throws Exception {
@@ -201,12 +209,10 @@ public class TriggerRunner<W extends BoundedWindow> {
   }
 
   /**
-   * Clear finished bits.
+   * Clear the finished bits.
    */
   public void clearFinished(StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).clear();
-    }
+    clearFinishedBits(state.access(FINISHED_BITS_TAG));
   }
 
   /**

Reply via email to