Repository: incubator-beam Updated Branches: refs/heads/master 1aee01753 -> 6b408b2a3
Clear finished bits from merged-away windows, not merge result window. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99c0fa79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99c0fa79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99c0fa79 Branch: refs/heads/master Commit: 99c0fa794e4a99121a668fc0becb5ae9ead68dfc Parents: f2f5129 Author: Mark Shields <markshie...@google.com> Authored: Mon Apr 4 13:31:12 2016 -0700 Committer: Mark Shields <markshie...@google.com> Committed: Tue Apr 5 13:11:07 2016 -0700 ---------------------------------------------------------------------- .../cloud/dataflow/sdk/util/TriggerRunner.java | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c0fa79/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index 7f81a9b..aa7e101 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -84,6 +84,15 @@ public class TriggerRunner<W extends BoundedWindow> { : FinishedTriggersBitSet.fromBitSet(bitSet); } + + private void clearFinishedBits(ValueState<BitSet> state) { + if (!isFinishedSetNeeded()) { + // Nothing to clear. + return; + } + state.clear(); + } + /** Return true if the trigger is closed in the window corresponding to the specified state. */ public boolean isClosed(StateAccessor<?> state) { return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); @@ -151,6 +160,8 @@ public class TriggerRunner<W extends BoundedWindow> { state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) { // Don't need to clone these, since the trigger context doesn't allow modification builder.put(entry.getKey(), readFinishedBits(entry.getValue())); + // Clear the underlying finished bits. + clearFinishedBits(entry.getValue()); } ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build(); @@ -161,9 +172,6 @@ public class TriggerRunner<W extends BoundedWindow> { rootTrigger.invokeOnMerge(mergeContext); persistFinishedSet(state, finishedSet); - - // Clear the finished bits. - clearFinished(state); } public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception { @@ -201,12 +209,10 @@ public class TriggerRunner<W extends BoundedWindow> { } /** - * Clear finished bits. + * Clear the finished bits. */ public void clearFinished(StateAccessor<?> state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).clear(); - } + clearFinishedBits(state.access(FINISHED_BITS_TAG)); } /**