Repository: incubator-beam Updated Branches: refs/heads/master 164676bc7 -> fba1259b4
Drop elements in closed windows before mapping window Previously, the sequence was: 1. Map a window to a representative of its equivalence class according to merging. 2. Drop the element if that window was closed. But this crashes if the original window was already closed. The new sequence is reversed. This is safe, because it is not possible to map to a representative which is closed, as it is no longer a candidate for merges. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/27c6c795 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/27c6c795 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/27c6c795 Branch: refs/heads/master Commit: 27c6c795271e7a927ed0d07679ce9d6de300c38f Parents: 96e286f Author: Mark Shields <markshie...@google.com> Authored: Thu Mar 31 10:57:55 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Mar 31 19:17:42 2016 -0700 ---------------------------------------------------------------------- .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 29 ++++++++------- .../dataflow/sdk/util/ReduceFnRunnerTest.java | 38 ++++++++++++++++++++ 2 files changed, 54 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27c6c795/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index d62bcc9..2415dab 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -438,8 +438,22 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { for (BoundedWindow untypedWindow : value.getWindows()) { @SuppressWarnings("unchecked") W window = (W) untypedWindow; + + ReduceFn<K, InputT, OutputT, W>.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + if (triggerRunner.isClosed(directContext.state())) { + // This window has already been closed. + droppedDueToClosedWindow.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", + value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + continue; + } + W active = activeWindows.representative(window); - Preconditions.checkState(active != null, "Window %s should have been added", window); + Preconditions.checkState(active != null, "Window %s has no representative", window); windows.add(active); } @@ -450,24 +464,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { triggerRunner.prefetchForValue(window, directContext.state()); } - // Process the element for each (representative) window it belongs to. + // Process the element for each (representative, not closed) window it belongs to. for (W window : windows) { ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); - // Check to see if the triggerRunner thinks the window is closed. If so, drop that window. - if (triggerRunner.isClosed(directContext.state())) { - droppedDueToClosedWindow.addValue(1L); - WindowTracing.debug( - "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", - value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - continue; - } - nonEmptyPanes.recordContent(renamedContext.state()); // Make sure we've scheduled the end-of-window or garbage collection timer for this window. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27c6c795/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index 10b886b..b58e360 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -725,6 +725,44 @@ public class ReduceFnRunnerTest { } /** + * If a later event tries to reuse an earlier session window which has been closed, we + * should reject that element and not fail due to the window no longer having a representative. + */ + @Test + public void testMergingWithReusedWindow() throws Exception { + ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), + ClosingBehavior.FIRE_IF_NON_EMPTY); + + // One elements in one session window. + tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. + + // Close the trigger, but the gargbage collection timer is still pending. + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTrigger); + tester.advanceInputWatermark(new Instant(15)); + + // Another element in the same session window. + // Should be discarded with 'window closed'. + tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. + + // Now the garbage collection timer will fire, finding the trigger already closed. + tester.advanceInputWatermark(new Instant(100)); + + List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); + assertThat(output.size(), equalTo(1)); + assertThat(output.get(0), + isSingleWindowedValue(containsInAnyOrder(1), + 1, // timestamp + 1, // window start + 11)); // window end + assertThat( + output.get(0).getPane(), + equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); + } + + /** * Tests that when data is assigned to multiple windows but some of those windows have * had their triggers finish, then the data is dropped and counted accurately. */