Repository: incubator-beam
Updated Branches:
  refs/heads/master 27abf446b -> 68b8cbc81


Factor toBeMerged->mergeResult map out of MergingActiveWindowSet


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d534ac50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d534ac50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d534ac50

Branch: refs/heads/master
Commit: d534ac503a8fcd7e6c2e6eeef0e7da0e48b98d2a
Parents: 27abf44
Author: Mark Shields <markshie...@google.com>
Authored: Mon May 23 17:06:48 2016 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon May 23 21:04:51 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/ActiveWindowSet.java   | 109 ++++---
 .../beam/sdk/util/MergingActiveWindowSet.java   | 294 +++++--------------
 .../sdk/util/NonMergingActiveWindowSet.java     |  18 +-
 .../apache/beam/sdk/util/ReduceFnRunner.java    | 190 ++++++------
 .../sdk/util/MergingActiveWindowSetTest.java    | 220 ++++++++++----
 .../org/apache/beam/sdk/util/TriggerTester.java |  26 +-
 6 files changed, 416 insertions(+), 441 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java
index e1ab9e9..02c12c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -26,44 +25,40 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Set;
 
-import javax.annotation.Nullable;
-
 /**
- * Track which active windows have their state associated with merged-away 
windows.
+ * Track which windows are <i>active</i>, and the <i>state address 
window(s)</i> under which their
+ * state is stored. Also help with the multi-step process of merging windows 
and their associated
+ * state.
+ *
+ * <p>When windows are merged we must also merge their state. For example, we 
may need to
+ * concatenate buffered elements, sum a count of elements, or find a new 
minimum timestamp.
+ * If we start with two windows {@code Wa} and {@code Wb} and later discover 
they should be
+ * merged into window {@code Wab} then, naively, we must copy and merge the 
states of {@code Wa}
+ * and {@code Wab} into {@code Wab}.
  *
- * When windows are merged we must track which state previously associated 
with the merged windows
- * must now be associated with the result window. Some of that state may be 
combined eagerly when
- * the windows are merged. The rest is combined lazily when the final state is 
actually
- * required when emitting a pane. We keep track of this using an {@link 
ActiveWindowSet}.
+ * <p>However, the common case for merging windows is for a new window to be 
merged into an
+ * existing window. Thus, if {@code Wa} is the existing window and {@code Wb} 
the new window it
+ * is more efficient to leave the state for {@code Wa} where it is, and simply 
redirect {@code
+ * Wab} to it. In this case we say {@code Wab} has a state address window of 
{@code Wa}.
  *
- * <p>An {@link ActiveWindowSet} considers a window to be in one of the 
following states:
+ * <p>Even if windows {@code Wa} and {@code Wb} already have state, it can 
still be more efficient
+ * to append the state of {@code Wb} onto {@code Wa} rather than copy the 
state from {@code Wa}
+ * and {@code Wb} into {@code Wab}.
  *
+ * <p>We use the following terminology for windows:
  * <ol>
- *   <li><b>NEW</b>: The initial state for a window on an incoming element; we 
do not yet know
- *       if it should be merged into an ACTIVE window, or whether it is 
already present as an
- *       ACTIVE window, since we have not yet called
- *       {@link WindowFn#mergeWindows}.</li>
- *   <li><b>ACTIVE</b>: A window that has state associated with it and has not 
itself been merged
- *       away. The window may have one or more <i>state address</i> windows 
under which its
- *       non-empty state is stored. A state value for an ACTIVE window must be 
derived by reading
- *       the state in all of its state address windows.</li>
- *   <li><b>EPHEMERAL</b>: A NEW window that has been merged into an ACTIVE 
window before any state
- *       has been associated with that window. Thus the window is neither 
ACTIVE nor MERGED. These
- *       windows are not persistently represented since if they reappear the 
merge function should
- *       again redirect them to an ACTIVE window. EPHEMERAL windows are an 
optimization for
- *       the common case of in-order events and {@link Sessions session 
window} by never associating
- *       state with windows that are created and immediately merged away.</li>
- *   <li><b>MERGED</b>: An ACTIVE window has been merged into another ACTIVE 
window after it had
- *       state associated with it. The window will thus appear as a state 
address window for exactly
- *       one ACTIVE window.</li>
- *   <li><b>EXPIRED</b>: The window has expired and may have been garbage 
collected. No new elements
- *       (even late elements) will ever be assigned to that window. These 
windows are not explicitly
- *       represented anywhere; it is expected that the user of {@link 
ActiveWindowSet} will store
- *       no state associated with the window.</li>
+ * <li><b>ACTIVE</b>: A window that has state associated with it and has not 
itself been merged
+ * away. The window may have one (or more) state address windows under which 
its
+ * non-empty state is stored. A state value for an ACTIVE window must be 
derived by reading
+ * the state in (all of) its state address windows. Note that only pre 1.4 
pipelines
+ * use multiple state address windows per active window. From 1.4 onwards we 
eagerly merge
+ * window state into a single state address window.
+ * <li><b>NEW</b>: The initial state for a window of an incoming element which 
is not
+ * already ACTIVE. We have not yet called {@link WindowFn#mergeWindows}, and 
so don't yet know
+ * whether the window will be be merged into another NEW or ACTIVE window, or 
will
+ * become an ACTIVE window in its own right.
  * </ol>
  *
- * <p>
- *
  * <p>If no windows will ever be merged we can use the trivial implementation 
{@link
  * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this 
data structure is in
  * {@link MergingActiveWindowSet}.
@@ -78,27 +73,25 @@ public interface ActiveWindowSet<W extends BoundedWindow> {
     /**
      * Called when windows are about to be merged, but before any {@link 
#onMerge} callback
      * has been made.
+     *
+     * @param toBeMerged  the windows about to be merged.
+     * @param mergeResult the result window, either a member of {@code 
toBeMerged} or new.
      */
-    void prefetchOnMerge(Collection<W> toBeMerged, Collection<W> 
activeToBeMerged, W mergeResult)
-        throws Exception;
+    void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception;
 
     /**
      * Called when windows are about to be merged, after all {@link 
#prefetchOnMerge} calls
      * have been made, but before the active window set has been updated to 
reflect the merge.
      *
-     * @param toBeMerged the windows about to be merged.
-     * @param activeToBeMerged the subset of {@code toBeMerged} corresponding 
to windows which
-     * are currently ACTIVE (and about to be merged). The remaining windows 
have been deemed
-     * EPHEMERAL, and thus have no state associated with them.
+     * @param toBeMerged  the windows about to be merged.
      * @param mergeResult the result window, either a member of {@code 
toBeMerged} or new.
      */
-    void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W 
mergeResult)
-        throws Exception;
+    void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception;
   }
 
   /**
-   * Remove EPHEMERAL windows and remaining NEW windows since we only need to 
know about them
-   * while processing new elements.
+   * Remove any remaining NEW windows since they were not promoted to being 
ACTIVE
+   * by {@link #ensureWindowIsActive} and we don't need to record anything 
about them.
    */
   void cleanupTemporaryWindows();
 
@@ -108,42 +101,40 @@ public interface ActiveWindowSet<W extends BoundedWindow> 
{
   void persist();
 
   /**
-   * Return the ACTIVE window into which {@code window} has been merged.
-   * Return {@code window} itself if it is ACTIVE. Return null if {@code 
window} has not
-   * yet been seen.
+   * Return (a view of) the set of currently ACTIVE and NEW windows.
    */
-  @Nullable
-  W mergeResultWindow(W window);
+  Set<W> getActiveAndNewWindows();
 
   /**
-   * Return (a view of) the set of currently ACTIVE windows.
+   * Return {@code true} if {@code window} is ACTIVE.
    */
-  Set<W> getActiveWindows();
+  boolean isActive(W window);
 
   /**
-   * Return {@code true} if {@code window} is ACTIVE.
+   * Return {@code true} if {@code window} is ACTIVE or NEW.
    */
-  boolean isActive(W window);
+  boolean isActiveOrNew(W window);
 
   /**
    * Called when an incoming element indicates it is a member of {@code 
window}, but before we
    * have started processing that element. If {@code window} is not already 
known to be ACTIVE,
-   * MERGED or EPHEMERAL then add it as NEW.
+   * then add it as NEW.
    */
   void ensureWindowExists(W window);
 
   /**
    * Called when a NEW or ACTIVE window is now known to be ACTIVE.
-   * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only 
state address window).
+   * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only 
state address
+   * window).
    */
   void ensureWindowIsActive(W window);
 
   /**
-   * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL 
then add it
-   * as ACTIVE.
+   * If {@code window} is not already known to be ACTIVE then add it as ACTIVE.
+   * For testing only.
    */
   @VisibleForTesting
-  void addActive(W window);
+  void addActiveForTesting(W window);
 
   /**
    * Remove {@code window} from the set.
@@ -152,9 +143,9 @@ public interface ActiveWindowSet<W extends BoundedWindow> {
 
   /**
    * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated 
with this window set,
-   * merging as many of the active windows as possible. {@code mergeCallback} 
will be invoked for
-   * each group of windows that are merged. After this no NEW windows will 
remain, all merge
-   * result windows will be ACTIVE, and all windows which have been merged 
away will not be ACTIVE.
+   * merging as many of the NEW and ACTIVE windows as possible. {@code 
mergeCallback} will be
+   * invoked for each group of windows that are merged. After this all merge 
result windows will
+   * be ACTIVE, and all windows which have been merged away will be neither 
ACTIVE nor NEW.
    */
   void merge(MergeCallback<W> mergeCallback) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
index 13c1e34..07e47aa 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.state.ValueState;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -39,71 +40,30 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 import javax.annotation.Nullable;
 
 /**
  * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations.
- * <p>
- * <p>The underlying notion of {@link MergingActiveWindowSet} is that of 
representing equivalence
- * classes of merged windows as a mapping from the merged "super-window" to a 
set of
- * <i>state address</i> windows in which some state has been persisted. The 
mapping need not
- * contain EPHEMERAL windows, because they are created and merged without any 
persistent state.
- * Each window must be a state address window for at most one window, so the 
mapping is
- * invertible.
- * <p>
- * <p>The states of a non-expired window are treated as follows:
- * <p>
- * <ul>
- * <li><b>NEW</b>: a NEW has an empty set of associated state address 
windows.</li>
- * <li><b>ACTIVE</b>: an ACTIVE window will be associated with some nonempty 
set of state
- * address windows. If the window has not merged, this will necessarily be the 
singleton set
- * containing just itself, but it is not required that an ACTIVE window be 
amongst its
- * state address windows.</li>
- * <li><b>MERGED</b>: a MERGED window will be in the set of associated windows 
for some
- * other window - that window is retrieved via {@link #mergeResultWindow} 
(this reverse
- * association is implemented in O(1) time).</li>
- * <li><b>EPHEMERAL</b>: EPHEMERAL windows are not persisted but are tracked 
transiently;
- * an EPHEMERAL window must be registered with this {@link ActiveWindowSet} by 
a call
- * to {@link #recordMerge} prior to any request for a {@link 
#mergeResultWindow}.</li>
- * </ul>
- * <p>
- * <p>To illustrate why an ACTIVE window need not be amongst its own state 
address windows,
- * consider two active windows W1 and W2 that are merged to form W12. Further 
writes may be
- * applied to either of W1 or W2, since a read of W12 implies reading both of 
W12 and merging
- * their results. Hence W12 need not have state directly associated with it.
  */
 public class MergingActiveWindowSet<W extends BoundedWindow> implements 
ActiveWindowSet<W> {
   private final WindowFn<Object, W> windowFn;
 
   /**
    * Map ACTIVE and NEW windows to their state address windows. Persisted.
-   */
-  private final Map<W, Set<W>> activeWindowToStateAddressWindows;
-
-  /**
-   * As above, but only for EPHEMERAL windows. Does not need to be persisted.
-   */
-  private final Map<W, Set<W>> activeWindowToEphemeralWindows;
-
-  /**
-   * A map from window to the ACTIVE window it has been merged into. Does not 
need to be persisted.
-   * <p>
    * <ul>
-   * <li>Key window may be ACTIVE, MERGED or EPHEMERAL.
-   * <li>ACTIVE windows map to themselves.
-   * <li>If W1 maps to W2 then W2 is in {@link 
#activeWindowToStateAddressWindows}.
-   * <li>If W1 = W2 then W1 is ACTIVE. If W1 is in the state address window 
set for W2 then W1 is
-   * MERGED. Otherwise W1 is EPHEMERAL.
+   * <li>A NEW window has the empty set as its value.
+   * <li>An ACTIVE window has its (typically singleton) set of state address 
windows as
+   * its value.
    * </ul>
    */
-  private final Map<W, W> windowToActiveWindow;
+  private final Map<W, Set<W>> activeWindowToStateAddressWindows;
 
   /**
    * Deep clone of {@link #activeWindowToStateAddressWindows} as of last 
commit.
-   * <p>
-   * <p>Used to avoid writing to state if no changes have been made during the 
work unit.
+   * Used to avoid writing to state if no changes have been made during the 
work unit.
    */
   private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
 
@@ -115,42 +75,32 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
   public MergingActiveWindowSet(WindowFn<Object, W> windowFn, 
StateInternals<?> state) {
     this.windowFn = windowFn;
 
-    StateTag<Object, ValueState<Map<W, Set<W>>>> mergeTreeAddr =
+    StateTag<Object, ValueState<Map<W, Set<W>>>> tag =
         StateTags.makeSystemTagInternal(StateTags.value(
             "tree", MapCoder.of(windowFn.windowCoder(), 
SetCoder.of(windowFn.windowCoder()))));
-    valueState = state.state(StateNamespaces.global(), mergeTreeAddr);
-    // Little use trying to prefetch this state since the ReduceFnRunner is 
stymied until it is
-    // available.
+    valueState = state.state(StateNamespaces.global(), tag);
+    // Little use trying to prefetch this state since the ReduceFnRunner
+    // is stymied until it is available.
     activeWindowToStateAddressWindows = emptyIfNull(valueState.read());
-    activeWindowToEphemeralWindows = new HashMap<>();
     originalActiveWindowToStateAddressWindows = 
deepCopy(activeWindowToStateAddressWindows);
-    windowToActiveWindow = invert(activeWindowToStateAddressWindows);
   }
 
   @Override
   public void cleanupTemporaryWindows() {
-    // All NEW windows can be forgotten.
-    Iterator<Map.Entry<W, Set<W>>> iter =
-        activeWindowToStateAddressWindows.entrySet().iterator();
+    // All NEW windows can be forgotten since they must have ended up being 
merged into
+    // some other ACTIVE window.
+    Iterator<Map.Entry<W, Set<W>>> iter = 
activeWindowToStateAddressWindows.entrySet().iterator();
     while (iter.hasNext()) {
       Map.Entry<W, Set<W>> entry = iter.next();
       if (entry.getValue().isEmpty()) {
-        windowToActiveWindow.remove(entry.getKey());
         iter.remove();
       }
     }
-
-    // All EPHEMERAL windows can be forgotten.
-    for (Map.Entry<W, Set<W>> entry : 
activeWindowToEphemeralWindows.entrySet()) {
-      for (W ephemeral : entry.getValue()) {
-        windowToActiveWindow.remove(ephemeral);
-      }
-    }
-    activeWindowToEphemeralWindows.clear();
   }
 
   @Override
   public void persist() {
+    checkInvariants();
     if (activeWindowToStateAddressWindows.isEmpty()) {
       // Force all persistent state to disappear.
       valueState.clear();
@@ -160,42 +110,32 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
       // No change.
       return;
     }
-    // All NEW windows must have been accounted for.
-    for (Map.Entry<W, Set<W>> entry : 
activeWindowToStateAddressWindows.entrySet()) {
-      Preconditions.checkState(
-          !entry.getValue().isEmpty(), "Cannot persist NEW window %s", 
entry.getKey());
-    }
-    // Should be no EPHEMERAL windows.
-    Preconditions.checkState(
-        activeWindowToEphemeralWindows.isEmpty(), "Unexpected EPHEMERAL 
windows before persist");
-
     valueState.write(activeWindowToStateAddressWindows);
     // No need to update originalActiveWindowToStateAddressWindows since this 
object is about to
     // become garbage.
   }
 
   @Override
-  @Nullable
-  public W mergeResultWindow(W window) {
-    return windowToActiveWindow.get(window);
+  public Set<W> getActiveAndNewWindows() {
+    return activeWindowToStateAddressWindows.keySet();
   }
 
   @Override
-  public Set<W> getActiveWindows() {
-    return activeWindowToStateAddressWindows.keySet();
+  public boolean isActive(W window) {
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+    return stateAddressWindows != null && !stateAddressWindows.isEmpty();
   }
 
   @Override
-  public boolean isActive(W window) {
+  public boolean isActiveOrNew(W window) {
     return activeWindowToStateAddressWindows.containsKey(window);
   }
 
   @Override
   public void ensureWindowExists(W window) {
-    if (!windowToActiveWindow.containsKey(window)) {
-      
Preconditions.checkState(!activeWindowToStateAddressWindows.containsKey(window));
+    if (!activeWindowToStateAddressWindows.containsKey(window)) {
+      // Add window as NEW.
       activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>());
-      windowToActiveWindow.put(window, window);
     }
   }
 
@@ -203,48 +143,40 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
   public void ensureWindowIsActive(W window) {
     Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
     Preconditions.checkState(stateAddressWindows != null,
-        "Cannot ensure window %s is active since it is neither ACTIVE nor NEW",
-        window);
+                             "Cannot ensure window %s is active since it is 
neither ACTIVE nor NEW",
+                             window);
     if (stateAddressWindows.isEmpty()) {
-      // Window was NEW, make it ACTIVE.
-      Preconditions.checkState(windowToActiveWindow.containsKey(window)
-                               && 
windowToActiveWindow.get(window).equals(window));
+      // Window was NEW, make it ACTIVE with itself as its state address 
window.
       stateAddressWindows.add(window);
     }
   }
 
   @Override
   @VisibleForTesting
-  public void addActive(W window) {
-    if (!windowToActiveWindow.containsKey(window)) {
+  public void addActiveForTesting(W window) {
+    if (!activeWindowToStateAddressWindows.containsKey(window)) {
+      // Make window ACTIVE with itself as its state address window.
       Set<W> stateAddressWindows = new LinkedHashSet<>();
       stateAddressWindows.add(window);
       activeWindowToStateAddressWindows.put(window, stateAddressWindows);
-      windowToActiveWindow.put(window, window);
+    }
+  }
+
+  @VisibleForTesting
+  public void addActiveForTesting(W window, Iterable<W> stateAddressWindows) {
+    if (!activeWindowToStateAddressWindows.containsKey(window)) {
+      activeWindowToStateAddressWindows.put(window, 
Sets.newLinkedHashSet(stateAddressWindows));
     }
   }
 
   @Override
   public void remove(W window) {
-    Set<W> stateAddressWindows = 
activeWindowToStateAddressWindows.remove(window);
-    if (stateAddressWindows != null) {
-      for (W stateAddressWindow : stateAddressWindows) {
-        windowToActiveWindow.remove(stateAddressWindow);
-      }
-    }
-    Set<W> ephemeralWindows = activeWindowToEphemeralWindows.remove(window);
-    if (ephemeralWindows != null) {
-      for (W ephemeralWindow : ephemeralWindows) {
-        windowToActiveWindow.remove(ephemeralWindow);
-      }
-    }
-    windowToActiveWindow.remove(window);
+    activeWindowToStateAddressWindows.remove(window);
   }
 
   private class MergeContextImpl extends WindowFn<Object, W>.MergeContext {
     private MergeCallback<W> mergeCallback;
     private final List<Collection<W>> allToBeMerged;
-    private final List<Collection<W>> allActiveToBeMerged;
     private final List<W> allMergeResults;
     private final Set<W> seen;
 
@@ -252,7 +184,6 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
       windowFn.super();
       this.mergeCallback = mergeCallback;
       allToBeMerged = new ArrayList<>();
-      allActiveToBeMerged = new ArrayList<>();
       allMergeResults = new ArrayList<>();
       seen = new HashSet<>();
     }
@@ -268,12 +199,11 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
       Preconditions.checkNotNull(toBeMerged);
       Preconditions.checkNotNull(mergeResult);
       List<W> copyOfToBeMerged = new ArrayList<>(toBeMerged.size());
-      List<W> activeToBeMerged = new ArrayList<>(toBeMerged.size());
       boolean includesMergeResult = false;
       for (W window : toBeMerged) {
         Preconditions.checkNotNull(window);
         Preconditions.checkState(
-            isActive(window), "Expecting merge window %s to be active", 
window);
+            isActiveOrNew(window), "Expecting merge window %s to be ACTIVE or 
NEW", window);
         if (window.equals(mergeResult)) {
           includesMergeResult = true;
         }
@@ -281,31 +211,24 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
         Preconditions.checkState(
             notDup, "Expecting merge window %s to appear in at most one merge 
set", window);
         copyOfToBeMerged.add(window);
-        if (!activeWindowToStateAddressWindows.get(window).isEmpty()) {
-          activeToBeMerged.add(window);
-        }
       }
       if (!includesMergeResult) {
         Preconditions.checkState(
-            !isActive(mergeResult), "Expecting result window %s to be new", 
mergeResult);
+            !isActive(mergeResult), "Expecting result window %s to be NEW", 
mergeResult);
       }
       allToBeMerged.add(copyOfToBeMerged);
-      allActiveToBeMerged.add(activeToBeMerged);
       allMergeResults.add(mergeResult);
     }
 
     public void recordMerges() throws Exception {
       for (int i = 0; i < allToBeMerged.size(); i++) {
-        mergeCallback.prefetchOnMerge(
-            allToBeMerged.get(i), allActiveToBeMerged.get(i), 
allMergeResults.get(i));
+        mergeCallback.prefetchOnMerge(allToBeMerged.get(i), 
allMergeResults.get(i));
       }
       for (int i = 0; i < allToBeMerged.size(); i++) {
-        mergeCallback.onMerge(
-            allToBeMerged.get(i), allActiveToBeMerged.get(i), 
allMergeResults.get(i));
+        mergeCallback.onMerge(allToBeMerged.get(i), allMergeResults.get(i));
         recordMerge(allToBeMerged.get(i), allMergeResults.get(i));
       }
       allToBeMerged.clear();
-      allActiveToBeMerged.clear();
       allMergeResults.clear();
       seen.clear();
     }
@@ -330,6 +253,11 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
    * the active window set.
    */
   private void recordMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+    // Note that mergedWriteStateAddress must predict the result of 
writeStateAddress
+    // after the corresponding merge has been applied.
+    // Thus we must ensure the first state address window in the merged result 
here is
+    // deterministic.
+    // Thus we use a linked hash set.
     Set<W> newStateAddressWindows = new LinkedHashSet<>();
     Set<W> existingStateAddressWindows = 
activeWindowToStateAddressWindows.get(mergeResult);
     if (existingStateAddressWindows != null) {
@@ -337,70 +265,35 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
       newStateAddressWindows.addAll(existingStateAddressWindows);
     }
 
-    Set<W> newEphemeralWindows = new HashSet<>();
-    Set<W> existingEphemeralWindows = 
activeWindowToEphemeralWindows.get(mergeResult);
-    if (existingEphemeralWindows != null) {
-      // Preserve all the existing EPHEMERAL windows for meregResult.
-      newEphemeralWindows.addAll(existingEphemeralWindows);
-    }
-
     for (W other : toBeMerged) {
       Set<W> otherStateAddressWindows = 
activeWindowToStateAddressWindows.get(other);
-      Preconditions.checkState(otherStateAddressWindows != null, "Window %s is 
not ACTIVE", other);
+      Preconditions.checkState(otherStateAddressWindows != null,
+                               "Window %s is not ACTIVE or NEW", other);
 
       for (W otherStateAddressWindow : otherStateAddressWindows) {
         // Since otherTarget equiv other AND other equiv mergeResult
         // THEN otherTarget equiv mergeResult.
         newStateAddressWindows.add(otherStateAddressWindow);
-        windowToActiveWindow.put(otherStateAddressWindow, mergeResult);
       }
       activeWindowToStateAddressWindows.remove(other);
 
-      Set<W> otherEphemeralWindows = activeWindowToEphemeralWindows.get(other);
-      if (otherEphemeralWindows != null) {
-        for (W otherEphemeral : otherEphemeralWindows) {
-          // Since otherEphemeral equiv other AND other equiv mergeResult
-          // THEN otherEphemeral equiv mergeResult.
-          newEphemeralWindows.add(otherEphemeral);
-          windowToActiveWindow.put(otherEphemeral, mergeResult);
-        }
-      }
-      activeWindowToEphemeralWindows.remove(other);
-
       // Now other equiv mergeResult.
-      if (otherStateAddressWindows.contains(other)) {
-        // Other was ACTIVE and is now known to be MERGED.
-      } else if (otherStateAddressWindows.isEmpty()) {
-        // Other was NEW thus has no state. It is now EPHEMERAL.
-        newEphemeralWindows.add(other);
-      } else if (other.equals(mergeResult)) {
-        // Other was ACTIVE, was never used to store elements, but is still 
ACTIVE.
-        // Leave it as active.
-      } else {
-        // Other was ACTIVE, was never used to store element, as is no longer 
considered ACTIVE.
-        // It is now EPHEMERAL.
-        newEphemeralWindows.add(other);
-      }
-      windowToActiveWindow.put(other, mergeResult);
     }
 
     if (newStateAddressWindows.isEmpty()) {
       // If stateAddressWindows is empty then toBeMerged must have only 
contained EPHEMERAL windows.
-      // Promote mergeResult to be active now.
+      // Promote mergeResult to be ACTIVE now.
       newStateAddressWindows.add(mergeResult);
     }
-    windowToActiveWindow.put(mergeResult, mergeResult);
 
     activeWindowToStateAddressWindows.put(mergeResult, newStateAddressWindows);
-    if (!newEphemeralWindows.isEmpty()) {
-      activeWindowToEphemeralWindows.put(mergeResult, newEphemeralWindows);
-    }
 
     merged(mergeResult);
   }
 
   @Override
   public void merged(W window) {
+    // Take just the first state address window.
     Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
     Preconditions.checkState(stateAddressWindows != null, "Window %s is not 
ACTIVE", window);
     W first = Iterables.getFirst(stateAddressWindows, null);
@@ -410,8 +303,7 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
 
   /**
    * Return the state address windows for ACTIVE {@code window} from which all 
state associated
-   * should
-   * be read and merged.
+   * should be read and merged.
    */
   @Override
   public Set<W> readStateAddresses(W window) {
@@ -454,36 +346,14 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
     for (Map.Entry<W, Set<W>> entry : 
activeWindowToStateAddressWindows.entrySet()) {
       W active = entry.getKey();
       Preconditions.checkState(!entry.getValue().isEmpty(),
-          "Unexpected empty state address window set for ACTIVE window %s",
-          active);
+                               "Unexpected empty state address window set for 
ACTIVE window %s",
+                               active);
       for (W stateAddressWindow : entry.getValue()) {
         
Preconditions.checkState(knownStateAddressWindows.add(stateAddressWindow),
-            "%s is in more than one state address window set",
-            stateAddressWindow);
-        
Preconditions.checkState(active.equals(windowToActiveWindow.get(stateAddressWindow)),
-            "%s should have %s as its ACTIVE window", stateAddressWindow,
-            active);
+                                 "%s is in more than one state address window 
set",
+                                 stateAddressWindow);
       }
     }
-    for (Map.Entry<W, Set<W>> entry : 
activeWindowToEphemeralWindows.entrySet()) {
-      W active = entry.getKey();
-      
Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(active),
-          "%s must be ACTIVE window", active);
-      Preconditions.checkState(
-          !entry.getValue().isEmpty(), "Unexpected empty EPHEMERAL set for 
%s", active);
-      for (W ephemeralWindow : entry.getValue()) {
-        Preconditions.checkState(knownStateAddressWindows.add(ephemeralWindow),
-            "%s is EPHEMERAL/state address of more than one ACTIVE window",
-            ephemeralWindow);
-        
Preconditions.checkState(active.equals(windowToActiveWindow.get(ephemeralWindow)),
-            "%s should have %s as its ACTIVE window", ephemeralWindow, active);
-      }
-    }
-    for (Map.Entry<W, W> entry : windowToActiveWindow.entrySet()) {
-      
Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(entry.getValue()),
-          "%s should be ACTIVE since mergeResultWindow for %s",
-          entry.getValue(), entry.getKey());
-    }
   }
 
   @Override
@@ -502,23 +372,9 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
         sb.append(active);
         sb.append(":\n");
         for (W stateAddressWindow : stateAddressWindows) {
-          if (stateAddressWindow.equals(active)) {
-            sb.append("    ACTIVE ");
-          } else {
-            sb.append("    MERGED ");
-          }
+          sb.append("    ");
           sb.append(stateAddressWindow);
           sb.append("\n");
-          W active2 = windowToActiveWindow.get(stateAddressWindow);
-          Preconditions.checkState(active2.equals(active));
-        }
-        Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(active);
-        if (ephemeralWindows != null) {
-          for (W ephemeralWindow : ephemeralWindows) {
-            sb.append("    EPHEMERAL ");
-            sb.append(ephemeralWindow);
-            sb.append('\n');
-          }
         }
       }
     }
@@ -526,10 +382,26 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
     return sb.toString();
   }
 
-  // ======================================================================
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof MergingActiveWindowSet)) {
+      return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    MergingActiveWindowSet<W> other = (MergingActiveWindowSet<W>) o;
+
+    return 
activeWindowToStateAddressWindows.equals(other.activeWindowToStateAddressWindows);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(activeWindowToStateAddressWindows);
+  }
 
   /**
-   * Replace null {@code multimap} with empty map, and replace null entries in 
{@code multimap} with
+   * Replace null {@code multimap} with empty map, and replace null entries in 
{@code multimap}
+   * with
    * empty sets.
    */
   private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> 
multimap) {
@@ -555,22 +427,4 @@ public class MergingActiveWindowSet<W extends 
BoundedWindow> implements ActiveWi
     }
     return newMultimap;
   }
-
-  /**
-   * Return inversion of {@code multimap}, which must be invertible.
-   */
-  private static <W> Map<W, W> invert(Map<W, Set<W>> multimap) {
-    Map<W, W> result = new HashMap<>();
-    for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
-      W active = entry.getKey();
-      for (W target : entry.getValue()) {
-        W previous = result.put(target, active);
-        Preconditions.checkState(
-            previous == null,
-            "Multimap is not invertible: Window %s has both %s and %s as 
representatives",
-            target, previous, active);
-      }
-    }
-    return result;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
index 0d02302..15a4ebe 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
@@ -27,7 +27,8 @@ import java.util.Collection;
 import java.util.Set;
 
 /**
- * Implementation of {@link ActiveWindowSet} used with {@link WindowFn 
WindowFns} that don't support
+ * Implementation of {@link ActiveWindowSet} used with {@link WindowFn 
WindowFns} that don't
+ * support
  * merging.
  *
  * @param <W> the types of windows being managed
@@ -40,13 +41,7 @@ public class NonMergingActiveWindowSet<W extends 
BoundedWindow> implements Activ
   public void persist() {}
 
   @Override
-  public W mergeResultWindow(W window) {
-    // Always represented by itself.
-    return window;
-  }
-
-  @Override
-  public Set<W> getActiveWindows() {
+  public Set<W> getActiveAndNewWindows() {
     // Only supported when merging.
     throw new java.lang.UnsupportedOperationException();
   }
@@ -58,6 +53,11 @@ public class NonMergingActiveWindowSet<W extends 
BoundedWindow> implements Activ
   }
 
   @Override
+  public boolean isActiveOrNew(W window) {
+    return true;
+  }
+
+  @Override
   public void ensureWindowExists(W window) {}
 
   @Override
@@ -65,7 +65,7 @@ public class NonMergingActiveWindowSet<W extends 
BoundedWindow> implements Activ
 
   @Override
   @VisibleForTesting
-  public void addActive(W window) {}
+  public void addActiveForTesting(W window) {}
 
   @Override
   public void remove(W window) {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index fe53389..e916aa8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -47,8 +48,10 @@ import org.joda.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import javax.annotation.Nullable;
@@ -61,21 +64,21 @@ import javax.annotation.Nullable;
  * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
  *
  * <ul>
- *   <li>Tracking the windows that are active (have buffered data) as elements 
arrive and
- *       triggers are fired.
- *   <li>Holding the watermark based on the timestamps of elements in a pane 
and releasing it
- *       when the trigger fires.
- *   <li>Calling the appropriate callbacks on {@link ReduceFn} based on 
trigger execution, timer
- *       firings, etc, and providing appropriate contexts to the {@link 
ReduceFn} for actions
- *       such as output.
- *   <li>Scheduling garbage collection of state associated with a specific 
window, and making that
- *       happen when the appropriate timer fires.
+ * <li>Tracking the windows that are active (have buffered data) as elements 
arrive and
+ * triggers are fired.
+ * <li>Holding the watermark based on the timestamps of elements in a pane and 
releasing it
+ * when the trigger fires.
+ * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger 
execution, timer
+ * firings, etc, and providing appropriate contexts to the {@link ReduceFn} 
for actions
+ * such as output.
+ * <li>Scheduling garbage collection of state associated with a specific 
window, and making that
+ * happen when the appropriate timer fires.
  * </ul>
  *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of values associated with the key.
+ * @param <K>       The type of key being processed.
+ * @param <InputT>  The type of values associated with the key.
  * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
+ * @param <W>       The type of windows this operates on.
  */
 public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
 
@@ -83,14 +86,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
    * The {@link ReduceFnRunner} depends on most aspects of the {@link 
WindowingStrategy}.
    *
    * <ul>
-   *   <li>It runs the trigger from the {@link WindowingStrategy}.</li>
-   *   <li>It merges windows according to the {@link WindowingStrategy}.</li>
-   *   <li>It chooses how to track active windows and clear out expired windows
-   *       according to the {@link WindowingStrategy}, based on the allowed 
lateness and
-   *       whether windows can merge.</li>
-   *   <li>It decides whether to emit empty final panes according to whether 
the
-   *       {@link WindowingStrategy} requires it.<li>
-   *   <li>It uses discarding or accumulation mode according to the {@link 
WindowingStrategy}.</li>
+   * <li>It runs the trigger from the {@link WindowingStrategy}.</li>
+   * <li>It merges windows according to the {@link WindowingStrategy}.</li>
+   * <li>It chooses how to track active windows and clear out expired windows
+   * according to the {@link WindowingStrategy}, based on the allowed lateness 
and
+   * whether windows can merge.</li>
+   * <li>It decides whether to emit empty final panes according to whether the
+   * {@link WindowingStrategy} requires it.<li>
+   * <li>It uses discarding or accumulation mode according to the {@link 
WindowingStrategy}.</li>
    * </ul>
    */
   private final WindowingStrategy<Object, W> windowingStrategy;
@@ -103,11 +106,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
 
   private final K key;
 
-  private final OnMergeCallback onMergeCallback = new OnMergeCallback();
-
   /**
-   * Track which windows are still active and which 'state address' windows 
contain state
-   * for a merged window.
+   * Track which windows are still active and the 'state address' windows 
which hold their state.
    *
    * <ul>
    * <li>State: Global map for all active windows for this computation and key.
@@ -251,7 +251,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
 
   @VisibleForTesting
   boolean hasNoActiveWindows() {
-    return activeWindows.getActiveWindows().isEmpty();
+    return activeWindows.getActiveAndNewWindows().isEmpty();
   }
 
   /**
@@ -260,34 +260,34 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
    *
    * <p>The general strategy is:
    * <ol>
-   *   <li>Use {@link WindowedValue#getWindows} (itself determined using
-   *       {@link WindowFn#assignWindows}) to determine which windows each 
element belongs to. Some
-   *       of those windows will already have state associated with them. The 
rest are considered
-   *       NEW.
-   *   <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently 
ACTIVE and NEW windows.
-   *       Each NEW window will become either ACTIVE, MERGED, or EPHEMERAL. 
(See {@link
-   *       ActiveWindowSet} for definitions of these terms.)
-   *   <li>If at all possible, eagerly substitute EPHEMERAL windows with their 
ACTIVE state address
-   *       windows before any state is associated with the EPHEMERAL window. 
In the common case that
-   *       windows for new elements are merged into existing ACTIVE windows 
then no additional
-   *       storage or merging overhead will be incurred.
-   *   <li>Otherwise, keep track of the state address windows for ACTIVE 
windows so that their
-   *       states can be merged on-demand when a pane fires.
-   *   <li>Process the element for each of the windows it's windows have been 
merged into according
-   *       to {@link ActiveWindowSet}. Processing may require running 
triggers, setting timers,
-   *       setting holds, and invoking {@link ReduceFn#onTrigger}.
+   * <li>Use {@link WindowedValue#getWindows} (itself determined using
+   * {@link WindowFn#assignWindows}) to determine which windows each element 
belongs to. Some
+   * of those windows will already have state associated with them. The rest 
are considered
+   * NEW.
+   * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently 
ACTIVE and NEW windows.
+   * Each NEW window will become either ACTIVE or be discardedL.
+   * (See {@link ActiveWindowSet} for definitions of these terms.)
+   * <li>If at all possible, eagerly substitute NEW windows with their ACTIVE 
state address
+   * windows before any state is associated with the NEW window. In the common 
case that
+   * windows for new elements are merged into existing ACTIVE windows then no 
additional
+   * storage or merging overhead will be incurred.
+   * <li>Otherwise, keep track of the state address windows for ACTIVE windows 
so that their
+   * states can be merged on-demand when a pane fires.
+   * <li>Process the element for each of the windows it's windows have been 
merged into according
+   * to {@link ActiveWindowSet}. Processing may require running triggers, 
setting timers,
+   * setting holds, and invoking {@link ReduceFn#onTrigger}.
    * </ol>
    */
   public void processElements(Iterable<WindowedValue<InputT>> values) throws 
Exception {
     // If an incoming element introduces a new window, attempt to merge it 
into an existing
-    // window eagerly. The outcome is stored in the ActiveWindowSet.
-    collectAndMergeWindows(values);
+    // window eagerly.
+    Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
 
     Set<W> windowsToConsider = new HashSet<>();
 
     // Process each element, using the updated activeWindows determined by 
collectAndMergeWindows.
     for (WindowedValue<InputT> value : values) {
-      windowsToConsider.addAll(processElement(value));
+      windowsToConsider.addAll(processElement(windowToMergeResult, value));
     }
 
     // Trigger output from any window for which the trigger is ready
@@ -303,8 +303,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     // We're all done with merging and emitting elements so can compress the 
activeWindow state.
     // Any windows which are still NEW must have come in on a new element 
which was then discarded
     // due to the window's trigger being closed. We can thus delete them.
-    // Any windows which are EPHEMERAL must have come in on a new element but 
been merged away
-    // into some other ACTIVE window. We can thus also delete them.
     activeWindows.cleanupTemporaryWindows();
   }
 
@@ -313,12 +311,15 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
   }
 
   /**
-   * Extract the windows associated with the values, and invoke merge.
+   * Extract the windows associated with the values, and invoke merge. Return 
a map
+   * from windows to the merge result window. If a window is not in the domain 
of
+   * the result map then it did not get merged into a different window.
    */
-  private void collectAndMergeWindows(Iterable<WindowedValue<InputT>> values) 
throws Exception {
+  private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> 
values)
+      throws Exception {
     // No-op if no merging can take place
     if (windowingStrategy.getWindowFn().isNonMerging()) {
-      return;
+      return ImmutableMap.of();
     }
 
     // Collect the windows from all elements (except those which are too late) 
and
@@ -329,6 +330,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
         W window = (W) untypedWindow;
 
         // For backwards compat with pre 1.4 only.
+        // We may still have ACTIVE windows with multiple state addresses, 
representing
+        // a window who's state has not yet been eagerly merged.
+        // We'll go ahead and merge that state now so that we don't have to 
worry about
+        // this legacy case anywhere else.
         if (activeWindows.isActive(window)) {
           Set<W> stateAddressWindows = 
activeWindows.readStateAddresses(window);
           if (stateAddressWindows.size() > 1) {
@@ -342,19 +347,41 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
           }
         }
 
-        // Add this window as NEW if it is not currently ACTIVE or MERGED.
+        // Add this window as NEW if it is not currently ACTIVE.
         // If we had already seen this window and closed its trigger, then the
-        // window will not be ACTIVE or MERGED. It will then be added as NEW 
here,
+        // window will not be currently ACTIVE. It will then be added as NEW 
here,
         // and fall into the merging logic as usual.
         activeWindows.ensureWindowExists(window);
       }
     }
 
     // Merge all of the active windows and retain a mapping from source 
windows to result windows.
-    mergeActiveWindows();
+    Map<W, W> windowToMergeResult = new HashMap<>();
+    activeWindows.merge(new OnMergeCallback(windowToMergeResult));
+    return windowToMergeResult;
   }
 
   private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
+    private final Map<W, W> windowToMergeResult;
+
+    OnMergeCallback(Map<W, W> windowToMergeResult) {
+      this.windowToMergeResult = windowToMergeResult;
+    }
+
+    /**
+     * Return the subset of {@code windows} which are currently ACTIVE. We 
only need to worry
+     * about merging state from ACTIVE windows. NEW windows by definition have 
no existing state.
+     */
+    private List<W> activeWindows(Iterable<W> windows) {
+      List<W> active = new ArrayList<>();
+      for (W window : windows) {
+        if (activeWindows.isActive(window)) {
+          active.add(window);
+        }
+      }
+      return active;
+    }
+
     /**
      * Called from the active window set to indicate {@code toBeMerged} (of 
which only
      * {@code activeToBeMerged} are ACTIVE and thus have state associated with 
them) will later
@@ -362,7 +389,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
      */
     @Override
     public void prefetchOnMerge(
-        Collection<W> toBeMerged, Collection<W> activeToBeMerged, W 
mergeResult) throws Exception {
+        Collection<W> toBeMerged, W mergeResult) throws Exception {
+      List<W> activeToBeMerged = activeWindows(toBeMerged);
       ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
           contextFactory.forMerge(activeToBeMerged, mergeResult, 
StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
@@ -381,9 +409,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
      * to be merged into {@code mergeResult}.
      */
     @Override
-    public void onMerge(Collection<W> toBeMerged, Collection<W> 
activeToBeMerged, W mergeResult)
-        throws Exception {
+    public void onMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+      // Remember we have merged these windows.
+      for (W window : toBeMerged) {
+        windowToMergeResult.put(window, mergeResult);
+      }
+
       // At this point activeWindows has NOT incorporated the results of the 
merge.
+      List<W> activeToBeMerged = activeWindows(toBeMerged);
       ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
           contextFactory.forMerge(activeToBeMerged, mergeResult, 
StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
@@ -407,7 +440,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
           // Not merged away.
           continue;
         }
-        // Cleanup flavor A: Currently ACTIVE window is about to become MERGED.
+        // Cleanup flavor A: Currently ACTIVE window is about to be merged 
away.
         // Clear any state not already cleared by the onMerge calls above.
         WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", 
active, mergeResult);
         ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
@@ -424,17 +457,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     }
   }
 
-  private void mergeActiveWindows() throws Exception {
-    activeWindows.merge(onMergeCallback);
-  }
-
   /**
    * Process an element.
-   * @param value the value being processed
    *
+   * @param value the value being processed
    * @return the set of windows in which the element was actually processed
    */
-  private Collection<W> processElement(WindowedValue<InputT> value) throws 
Exception {
+  private Collection<W> processElement(Map<W, W> windowToMergeResult, 
WindowedValue<InputT> value)
+      throws Exception {
     // Redirect element windows to the ACTIVE windows they have been merged 
into.
     // The compressed representation (value, {window1, window2, ...}) actually 
represents
     // distinct elements (value, window1), (value, window2), ...
@@ -444,12 +474,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     for (BoundedWindow untypedWindow : value.getWindows()) {
       @SuppressWarnings("unchecked")
       W window = (W) untypedWindow;
-
-      ReduceFn<K, InputT, OutputT, W>.Context directContext =
-          contextFactory.base(window, StateStyle.DIRECT);
-      W active = activeWindows.mergeResultWindow(window);
-      Preconditions.checkState(active != null, "Window %s has no 
mergeResultWindow", window);
-      windows.add(active);
+      W mergeResult = windowToMergeResult.get(window);
+      if (mergeResult == null) {
+        mergeResult = window;
+      }
+      windows.add(mergeResult);
     }
 
     // Prefetch in each of the windows if we're going to need to process 
triggers
@@ -464,7 +493,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     for (W window : windows) {
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = 
contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
-
       if (triggerRunner.isClosed(directContext.state())) {
         // This window has already been closed.
         droppedDueToClosedWindow.addValue(1L);
@@ -478,9 +506,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
 
       triggerableWindows.add(window);
       activeWindows.ensureWindowIsActive(window);
-
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = 
contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
+
       nonEmptyPanes.recordContent(renamedContext.state());
 
       // Make sure we've scheduled the end-of-window or garbage collection 
timer for this window.
@@ -544,10 +572,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     // - If the window function does not support windowing then all windows 
will be considered
     // active.
     // So we must take conjunction of activeWindows and triggerRunner state.
-    boolean windowIsActive =
+    boolean windowIsActiveAndOpen =
         activeWindows.isActive(window) && 
!triggerRunner.isClosed(directContext.state());
 
-    if (!windowIsActive) {
+    if (!windowIsActiveAndOpen) {
       WindowTracing.debug(
           "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window 
{}", timer, window);
     }
@@ -568,7 +596,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
           key, window, timer.getTimestamp(), 
timerInternals.currentInputWatermarkTime(),
           timerInternals.currentOutputWatermarkTime());
 
-      if (windowIsActive) {
+      if (windowIsActiveAndOpen) {
         // We need to call onTrigger to emit the final pane if required.
         // The final pane *may* be ON_TIME if no prior ON_TIME pane has been 
emitted,
         // and the watermark has passed the end of the window.
@@ -577,14 +605,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
 
       // Cleanup flavor B: Clear all the remaining state for this window since 
we'll never
       // see elements for it again.
-      clearAllState(directContext, renamedContext, windowIsActive);
+      clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
     } else {
       WindowTracing.debug(
           "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with 
"
           + "inputWatermark:{}; outputWatermark:{}",
           key, window, timer.getTimestamp(), 
timerInternals.currentInputWatermarkTime(),
           timerInternals.currentOutputWatermarkTime());
-      if (windowIsActive) {
+      if (windowIsActiveAndOpen) {
         emitIfAppropriate(directContext, renamedContext);
       }
 
@@ -625,9 +653,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
   private void clearAllState(
       ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      boolean windowIsActive)
-          throws Exception {
-    if (windowIsActive) {
+      boolean windowIsActiveAndOpen)
+      throws Exception {
+    if (windowIsActiveAndOpen) {
       // Since both the window is in the active window set AND the trigger was 
not yet closed,
       // it is possible we still have state.
       reduceFn.clearState(renamedContext);
@@ -649,10 +677,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       }
     }
     paneInfoTracker.clear(directContext.state());
-    if (activeWindows.isActive(directContext.window())) {
-      // Don't need to track address state windows anymore.
-      activeWindows.remove(directContext.window());
-    }
+    // Don't need to track address state windows anymore.
+    activeWindows.remove(directContext.window());
     // We'll never need to test for the trigger being closed again.
     triggerRunner.clearFinished(directContext.state());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
index 7ab8322..84699d6 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
@@ -39,6 +40,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Test NonMergingActiveWindowSet.
@@ -48,12 +52,17 @@ public class MergingActiveWindowSetTest {
   private Sessions windowFn;
   private StateInternals<String> state;
   private MergingActiveWindowSet<IntervalWindow> set;
+  private ActiveWindowSet.MergeCallback<IntervalWindow> callback;
 
   @Before
-  public void before() {
+  public void setup() {
     windowFn = Sessions.withGapDuration(Duration.millis(10));
     state = InMemoryStateInternals.forKey("dummyKey");
     set = new MergingActiveWindowSet<>(windowFn, state);
+    @SuppressWarnings("unchecked")
+    ActiveWindowSet.MergeCallback<IntervalWindow>
+        callback = mock(ActiveWindowSet.MergeCallback.class);
+    this.callback = callback;
   }
 
   @After
@@ -63,42 +72,78 @@ public class MergingActiveWindowSetTest {
     windowFn = null;
   }
 
-  private void add(final long instant) {
-    final Object element = new Long(instant);
-    Sessions.AssignContext context = windowFn.new AssignContext() {
-      @Override
-      public Object element() {
-        return element;
-      }
+  private void add(long... instants) {
+    for (final long instant : instants) {
+      System.out.println("ADD " + instant);
+      Sessions.AssignContext context = windowFn.new AssignContext() {
+        @Override
+        public Object element() {
+          return (Object) instant;
+        }
 
-      @Override
-      public Instant timestamp() {
-        return new Instant(instant);
-      }
+        @Override
+        public Instant timestamp() {
+          return new Instant(instant);
+        }
 
-      @Override
-      public Collection<? extends BoundedWindow> windows() {
-        return ImmutableList.of();
-      }
-    };
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          return ImmutableList.of();
+        }
+      };
 
-    for (IntervalWindow window : windowFn.assignWindows(context)) {
-      set.ensureWindowExists(window);
+      for (IntervalWindow window : windowFn.assignWindows(context)) {
+        set.ensureWindowExists(window);
+      }
     }
   }
 
-  private void merge(ActiveWindowSet.MergeCallback<IntervalWindow> callback) 
throws Exception {
+  private Map<IntervalWindow, IntervalWindow> merge(
+      List<IntervalWindow> toBeMerged,
+      IntervalWindow mergeResult) throws Exception {
+    IntervalWindow predictedPostMergeWriteStateAddress =
+        set.mergedWriteStateAddress(toBeMerged, mergeResult);
+
+    System.out.println("BEFORE MERGE");
+    System.out.println(set);
+    Map<IntervalWindow, IntervalWindow> map = new HashMap<>();
+    for (IntervalWindow window : toBeMerged) {
+      System.out.println("WILL MERGE " + window + " INTO " + mergeResult);
+      map.put(window, mergeResult);
+    }
+    System.out.println("AFTER MERGE");
     set.merge(callback);
-    for (IntervalWindow window : set.getActiveWindows()) {
-      set.ensureWindowIsActive(window);
+    verify(callback).onMerge(toBeMerged, mergeResult);
+    System.out.println(set);
+
+    assertEquals(predictedPostMergeWriteStateAddress, 
set.writeStateAddress(mergeResult));
+
+    return map;
+  }
+
+  private void activate(Map<IntervalWindow, IntervalWindow> map, long... 
instants) {
+    for (long instant : instants) {
+      IntervalWindow window = window(instant, 10);
+      IntervalWindow active = map.get(window);
+      if (active == null) {
+        active = window;
+      }
+      System.out.println("ACTIVATE " + active);
+      set.ensureWindowIsActive(active);
     }
     set.checkInvariants();
   }
 
-  private void pruneAndPersist() {
+  private void cleanup() {
+    System.out.println("CLEANUP");
     set.cleanupTemporaryWindows();
     set.checkInvariants();
+    System.out.println(set);
     set.persist();
+    MergingActiveWindowSet<IntervalWindow> reloaded =
+        new MergingActiveWindowSet<>(windowFn, state);
+    reloaded.checkInvariants();
+    assertEquals(set, reloaded);
   }
 
   private IntervalWindow window(long start, long size) {
@@ -106,70 +151,115 @@ public class MergingActiveWindowSetTest {
   }
 
   @Test
-  public void test() throws Exception {
-    @SuppressWarnings("unchecked")
-    ActiveWindowSet.MergeCallback<IntervalWindow> callback =
-        mock(ActiveWindowSet.MergeCallback.class);
-
+  public void testLifecycle() throws Exception {
+    // Step 1: New elements show up, introducing NEW windows which are 
partially merged.
     // NEW 1+10
     // NEW 2+10
     // NEW 15+10
     // =>
     // ACTIVE 1+11 (target 1+11)
-    // EPHEMERAL 1+10 -> 1+11
-    // EPHEMERAL 2+10 -> 1+11
     // ACTIVE 15+10 (target 15+10)
-    add(1);
-    add(2);
-    add(15);
-    merge(callback);
-    verify(callback).onMerge(ImmutableList.of(window(1, 10), window(2, 10)),
-        ImmutableList.<IntervalWindow>of(), window(1, 11));
-    assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), 
set.getActiveWindows());
-    assertEquals(window(1, 11), set.mergeResultWindow(window(1, 10)));
-    assertEquals(window(1, 11), set.mergeResultWindow(window(2, 10)));
-    assertEquals(window(1, 11), set.mergeResultWindow(window(1, 11)));
-    assertEquals(window(15, 10), set.mergeResultWindow(window(15, 10)));
+    add(1, 2, 15);
+    assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(15, 10)),
+                 set.getActiveAndNewWindows());
+    Map<IntervalWindow, IntervalWindow> map =
+        merge(ImmutableList.of(window(1, 10), window(2, 10)),
+              window(1, 11));
+    activate(map, 1, 2, 15);
+    assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), 
set.getActiveAndNewWindows());
     assertEquals(
-        ImmutableSet.<IntervalWindow>of(window(1, 11)), 
set.readStateAddresses(window(1, 11)));
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 11)));
     assertEquals(
-        ImmutableSet.<IntervalWindow>of(window(15, 10)), 
set.readStateAddresses(window(15, 10)));
+        ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 
10)));
+    cleanup();
 
+    // Step 2: Another element, merged into an existing ACTIVE window.
     // NEW 3+10
     // =>
     // ACTIVE 1+12 (target 1+11)
-    // EPHEMERAL 3+10 -> 1+12
     // ACTIVE 15+10 (target 15+10)
     add(3);
-    merge(callback);
-    verify(callback).onMerge(ImmutableList.of(window(1, 11), window(3, 10)),
-        ImmutableList.<IntervalWindow>of(window(1, 11)), window(1, 12));
-    assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), 
set.getActiveWindows());
-    assertEquals(window(1, 12), set.mergeResultWindow(window(3, 10)));
+    assertEquals(ImmutableSet.of(window(3, 10), window(1, 11), window(15, 10)),
+                 set.getActiveAndNewWindows());
+    map = merge(ImmutableList.of(window(1, 11), window(3, 10)),
+                window(1, 12));
+    activate(map, 3);
+    assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), 
set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 12)));
+    assertEquals(
+        ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 
10)));
+    cleanup();
 
+    // Step 3: Another element, causing two ACTIVE windows to be merged.
     // NEW 8+10
     // =>
-    // ACTIVE 1+24 (target 1+11, 15+10)
-    // MERGED 1+11 -> 1+24
-    // MERGED 15+10 -> 1+24
-    // EPHEMERAL 1+12 -> 1+24
+    // ACTIVE 1+24 (target 1+11)
     add(8);
-    merge(callback);
-    verify(callback).onMerge(ImmutableList.of(window(1, 12), window(8, 10), 
window(15, 10)),
-        ImmutableList.<IntervalWindow>of(window(1, 12), window(15, 10)), 
window(1, 24));
-    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveWindows());
-    assertEquals(window(1, 24), set.mergeResultWindow(window(1, 12)));
-    assertEquals(window(1, 24), set.mergeResultWindow(window(1, 11)));
-    assertEquals(window(1, 24), set.mergeResultWindow(window(15, 10)));
+    assertEquals(ImmutableSet.of(window(8, 10), window(1, 12), window(15, 10)),
+                 set.getActiveAndNewWindows());
+    map = merge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)),
+                window(1, 24));
+    activate(map, 8);
+    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
+    cleanup();
 
+    // Step 4: Another element, merged into an existing ACTIVE window.
     // NEW 9+10
     // =>
-    // ACTIVE 1+24 (target 1+11, 15+10)
+    // ACTIVE 1+24 (target 1+11)
     add(9);
-    merge(callback);
-    verify(callback).onMerge(ImmutableList.of(window(1, 24), window(9, 10)),
-        ImmutableList.<IntervalWindow>of(window(1, 24)), window(1, 24));
+    assertEquals(ImmutableSet.of(window(9, 10), window(1, 24)), 
set.getActiveAndNewWindows());
+    map = merge(ImmutableList.of(window(1, 24), window(9, 10)),
+                window(1, 24));
+    activate(map, 9);
+    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
+    cleanup();
+
+    // Step 5: Another element reusing earlier window, merged into an existing 
ACTIVE window.
+    // NEW 1+10
+    // =>
+    // ACTIVE 1+24 (target 1+11)
+    add(1);
+    assertEquals(ImmutableSet.of(window(1, 10), window(1, 24)), 
set.getActiveAndNewWindows());
+    map = merge(ImmutableList.of(window(1, 10), window(1, 24)),
+                window(1, 24));
+    activate(map, 1);
+    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
+    cleanup();
+
+    // Step 6: Window is closed.
+    set.remove(window(1, 24));
+    cleanup();
+    assertTrue(set.getActiveAndNewWindows().isEmpty());
+  }
+
+  @Test
+  public void testLegacyState() {
+    // Pre 1.4 we merged window state lazily.
+    // Simulate loading an active window set with multiple state address 
windows.
+    set.addActiveForTesting(window(1, 12),
+                            ImmutableList.of(window(1, 10), window(2, 10), 
window(3, 10)));
+
+
+    // Make sure we can detect and repair the state.
+    assertTrue(set.isActive(window(1, 12)));
+    assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(3, 10)),
+                 set.readStateAddresses(window(1, 12)));
+    assertEquals(window(1, 10),
+                 set.mergedWriteStateAddress(
+                     ImmutableList.of(window(1, 10), window(2, 10), window(3, 
10)),
+                     window(1, 12)));
+    set.merged(window(1, 12));
+    cleanup();
 
-    pruneAndPersist();
+    // For then on we are back to the eager case.
+    assertEquals(ImmutableSet.of(window(1, 10)), 
set.readStateAddresses(window(1, 12)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
index 0889b4f..a1e376e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -50,6 +50,7 @@ import com.google.common.collect.Maps;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -104,6 +105,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> 
{
   private final TriggerContextFactory<W> contextFactory;
   private final WindowFn<Object, W> windowFn;
   private final ActiveWindowSet<W> activeWindows;
+  private final Map<W, W> windowToMergeResult;
 
   /**
    * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link 
TriggerBuilder}
@@ -155,6 +157,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> 
{
         windowFn.isNonMerging()
             ? new NonMergingActiveWindowSet<W>()
             : new MergingActiveWindowSet<W>(windowFn, stateInternals);
+    this.windowToMergeResult = new HashMap<>();
 
     this.contextFactory =
         new TriggerContextFactory<>(windowingStrategy, stateInternals, 
activeWindows);
@@ -245,7 +248,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> 
{
             windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE)));
 
         for (W window : assignedWindows) {
-          activeWindows.addActive(window);
+          activeWindows.addActiveForTesting(window);
 
           // Today, triggers assume onTimer firing at the watermark time, 
whether or not they
           // explicitly set the timer themselves. So this tester must set it.
@@ -263,7 +266,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> 
{
       for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
         // SDK is responsible for type safety
         @SuppressWarnings("unchecked")
-        W window = activeWindows.mergeResultWindow((W) untypedWindow);
+        W window = mergeResult((W) untypedWindow);
 
         Trigger.OnElementContext context = 
contextFactory.createOnElementContext(window,
             new TestTimers(windowNamespace(window)), 
windowedValue.getTimestamp(),
@@ -312,14 +315,20 @@ public class TriggerTester<InputT, W extends 
BoundedWindow> {
    * since it is just to test the trigger's {@code OnMerge} method.
    */
   public final void mergeWindows() throws Exception {
+    windowToMergeResult.clear();
     activeWindows.merge(new MergeCallback<W>() {
       @Override
-      public void prefetchOnMerge(Collection<W> toBeMerged, Collection<W> 
activeToBeMerged,
-          W mergeResult) throws Exception {}
+      public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) 
throws Exception {}
 
       @Override
-      public void onMerge(Collection<W> toBeMerged, Collection<W> 
activeToBeMerged, W mergeResult)
-          throws Exception {
+      public void onMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+        List<W> activeToBeMerged = new ArrayList<W>();
+        for (W window : toBeMerged) {
+          windowToMergeResult.put(window, mergeResult);
+          if (activeWindows.isActive(window)) {
+            activeToBeMerged.add(window);
+          }
+        }
         Map<W, FinishedTriggers> mergingFinishedSets =
             Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
         for (W oldWindow : activeToBeMerged) {
@@ -334,6 +343,11 @@ public class TriggerTester<InputT, W extends 
BoundedWindow> {
     });
   }
 
+  public  W mergeResult(W window) {
+    W result = windowToMergeResult.get(window);
+    return result == null ? window : result;
+  }
+
   private FinishedTriggers getFinishedSet(W window) {
     FinishedTriggers finishedSet = finishedSets.get(window);
     if (finishedSet == null) {

Reply via email to