This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 51bb63f [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state new 06ce34e Merge pull request #14182 from [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state 51bb63f is described below commit 51bb63fc05441c7e9208407ecd5e172c009a269f Author: Yichi Zhang <zyi...@google.com> AuthorDate: Tue Mar 9 20:32:34 2021 -0800 [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state --- .../beam/fn/harness/WindowMergingFnRunner.java | 9 ++++++++- .../beam/fn/harness/WindowMergingFnRunnerTest.java | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java index edf0e0a..e7b169e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; /** @@ -154,7 +155,13 @@ public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> { for (KV<W, Collection<W>> mergedWindow : mergedWindows) { currentWindows.removeAll(mergedWindow.getValue()); } - return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows)); + KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> result = + KV.of( + windowsToMerge.getKey(), + KV.of(Sets.newHashSet(currentWindows), (Iterable) Lists.newArrayList(mergedWindows))); + currentWindows.clear(); + mergedWindows.clear(); + return result; } } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java index 9816ed6..359ea98 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java @@ -105,6 +105,28 @@ public class WindowMergingFnRunnerTest { Iterables.getOnlyElement(output.getValue().getValue()); assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)), mergedOutput.getKey()); assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMerged)); + + // Process a new group of windows, make sure that previous result has been cleaned up. + BoundedWindow[] expectedToBeMergedGroup2 = + new BoundedWindow[] { + new IntervalWindow(new Instant(15L), new Instant(17L)), + new IntervalWindow(new Instant(16L), new Instant(18L)) + }; + + input = + KV.of( + "abc", + ImmutableList.<BoundedWindow>builder() + .add(expectedToBeMergedGroup2) + .addAll(expectedToBeUnmerged) + .build()); + + output = mapFunction.apply(input); + assertEquals(input.getKey(), output.getKey()); + assertEquals(expectedToBeUnmerged, output.getValue().getKey()); + mergedOutput = Iterables.getOnlyElement(output.getValue().getValue()); + assertEquals(new IntervalWindow(new Instant(15L), new Instant(18L)), mergedOutput.getKey()); + assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMergedGroup2)); } private static <W extends BoundedWindow> RunnerApi.PTransform createMergeTransformForWindowFn(