Fix overflow in ReduceFnRunner garbage collection times
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f7a2ab4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f7a2ab4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f7a2ab4 Branch: refs/heads/python-sdk Commit: 4f7a2ab47c5fdd9b3de5f091a40128e68ddd11a3 Parents: 5bf732c Author: Kenneth Knowles <k...@google.com> Authored: Tue Jun 14 16:10:09 2016 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Mon Jun 20 15:14:30 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/ReduceFnRunner.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f7a2ab4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index 34208da..864e8e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -936,16 +936,21 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } /** - * Return when {@code window} should be garbage collected. If the window is the GlobalWindow, - * that will be the end of the window. Otherwise, add the allowed lateness to the end of - * the window. + * Return when {@code window} should be garbage collected. If the window's expiration time is on + * or after the end of the global window, it will be truncated to the end of the global window. */ private Instant garbageCollectionTime(W window) { - Instant maxTimestamp = window.maxTimestamp(); - if (maxTimestamp.isBefore(GlobalWindow.INSTANCE.maxTimestamp())) { - return maxTimestamp.plus(windowingStrategy.getAllowedLateness()); + + // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the + // global window, then we truncate it. The conditional is phrased like it is because the + // addition of EOW + allowed lateness might even overflow the maximum allowed Instant + if (GlobalWindow.INSTANCE + .maxTimestamp() + .minus(windowingStrategy.getAllowedLateness()) + .isBefore(window.maxTimestamp())) { + return GlobalWindow.INSTANCE.maxTimestamp(); } else { - return maxTimestamp; + return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); } }