This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 51bb63f  [BEAM-11952] Clean up merged window result in 
MergingViaWindowFnRunner to avoid accumulating unnecessary state
     new 06ce34e  Merge pull request #14182 from [BEAM-11952] Clean up merged 
window result in MergingViaWindowFnRunner to avoid accumulating unnecessary 
state
51bb63f is described below

commit 51bb63fc05441c7e9208407ecd5e172c009a269f
Author: Yichi Zhang <zyi...@google.com>
AuthorDate: Tue Mar 9 20:32:34 2021 -0800

    [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to 
avoid accumulating unnecessary state
---
 .../beam/fn/harness/WindowMergingFnRunner.java     |  9 ++++++++-
 .../beam/fn/harness/WindowMergingFnRunnerTest.java | 22 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
index edf0e0a..e7b169e 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
 import org.apache.beam.sdk.values.KV;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 
 /**
@@ -154,7 +155,13 @@ public abstract class WindowMergingFnRunner<T, W extends 
BoundedWindow> {
       for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
         currentWindows.removeAll(mergedWindow.getValue());
       }
-      return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) 
mergedWindows));
+      KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> result =
+          KV.of(
+              windowsToMerge.getKey(),
+              KV.of(Sets.newHashSet(currentWindows), (Iterable) 
Lists.newArrayList(mergedWindows)));
+      currentWindows.clear();
+      mergedWindows.clear();
+      return result;
     }
   }
 }
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
index 9816ed6..359ea98 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
@@ -105,6 +105,28 @@ public class WindowMergingFnRunnerTest {
         Iterables.getOnlyElement(output.getValue().getValue());
     assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)), 
mergedOutput.getKey());
     assertThat(mergedOutput.getValue(), 
containsInAnyOrder(expectedToBeMerged));
+
+    // Process a new group of windows, make sure that previous result has been 
cleaned up.
+    BoundedWindow[] expectedToBeMergedGroup2 =
+        new BoundedWindow[] {
+          new IntervalWindow(new Instant(15L), new Instant(17L)),
+          new IntervalWindow(new Instant(16L), new Instant(18L))
+        };
+
+    input =
+        KV.of(
+            "abc",
+            ImmutableList.<BoundedWindow>builder()
+                .add(expectedToBeMergedGroup2)
+                .addAll(expectedToBeUnmerged)
+                .build());
+
+    output = mapFunction.apply(input);
+    assertEquals(input.getKey(), output.getKey());
+    assertEquals(expectedToBeUnmerged, output.getValue().getKey());
+    mergedOutput = Iterables.getOnlyElement(output.getValue().getValue());
+    assertEquals(new IntervalWindow(new Instant(15L), new Instant(18L)), 
mergedOutput.getKey());
+    assertThat(mergedOutput.getValue(), 
containsInAnyOrder(expectedToBeMergedGroup2));
   }
 
   private static <W extends BoundedWindow> RunnerApi.PTransform 
createMergeTransformForWindowFn(

Reply via email to