Repository: incubator-beam Updated Branches: refs/heads/master 27abf446b -> 68b8cbc81
Factor toBeMerged->mergeResult map out of MergingActiveWindowSet Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d534ac50 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d534ac50 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d534ac50 Branch: refs/heads/master Commit: d534ac503a8fcd7e6c2e6eeef0e7da0e48b98d2a Parents: 27abf44 Author: Mark Shields <markshie...@google.com> Authored: Mon May 23 17:06:48 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon May 23 21:04:51 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/ActiveWindowSet.java | 109 ++++--- .../beam/sdk/util/MergingActiveWindowSet.java | 294 +++++-------------- .../sdk/util/NonMergingActiveWindowSet.java | 18 +- .../apache/beam/sdk/util/ReduceFnRunner.java | 190 ++++++------ .../sdk/util/MergingActiveWindowSetTest.java | 220 ++++++++++---- .../org/apache/beam/sdk/util/TriggerTester.java | 26 +- 6 files changed, 416 insertions(+), 441 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java index e1ab9e9..02c12c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.WindowFn; import com.google.common.annotations.VisibleForTesting; @@ -26,44 +25,40 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Collection; import java.util.Set; -import javax.annotation.Nullable; - /** - * Track which active windows have their state associated with merged-away windows. + * Track which windows are <i>active</i>, and the <i>state address window(s)</i> under which their + * state is stored. Also help with the multi-step process of merging windows and their associated + * state. + * + * <p>When windows are merged we must also merge their state. For example, we may need to + * concatenate buffered elements, sum a count of elements, or find a new minimum timestamp. + * If we start with two windows {@code Wa} and {@code Wb} and later discover they should be + * merged into window {@code Wab} then, naively, we must copy and merge the states of {@code Wa} + * and {@code Wab} into {@code Wab}. * - * When windows are merged we must track which state previously associated with the merged windows - * must now be associated with the result window. Some of that state may be combined eagerly when - * the windows are merged. The rest is combined lazily when the final state is actually - * required when emitting a pane. We keep track of this using an {@link ActiveWindowSet}. + * <p>However, the common case for merging windows is for a new window to be merged into an + * existing window. Thus, if {@code Wa} is the existing window and {@code Wb} the new window it + * is more efficient to leave the state for {@code Wa} where it is, and simply redirect {@code + * Wab} to it. In this case we say {@code Wab} has a state address window of {@code Wa}. * - * <p>An {@link ActiveWindowSet} considers a window to be in one of the following states: + * <p>Even if windows {@code Wa} and {@code Wb} already have state, it can still be more efficient + * to append the state of {@code Wb} onto {@code Wa} rather than copy the state from {@code Wa} + * and {@code Wb} into {@code Wab}. * + * <p>We use the following terminology for windows: * <ol> - * <li><b>NEW</b>: The initial state for a window on an incoming element; we do not yet know - * if it should be merged into an ACTIVE window, or whether it is already present as an - * ACTIVE window, since we have not yet called - * {@link WindowFn#mergeWindows}.</li> - * <li><b>ACTIVE</b>: A window that has state associated with it and has not itself been merged - * away. The window may have one or more <i>state address</i> windows under which its - * non-empty state is stored. A state value for an ACTIVE window must be derived by reading - * the state in all of its state address windows.</li> - * <li><b>EPHEMERAL</b>: A NEW window that has been merged into an ACTIVE window before any state - * has been associated with that window. Thus the window is neither ACTIVE nor MERGED. These - * windows are not persistently represented since if they reappear the merge function should - * again redirect them to an ACTIVE window. EPHEMERAL windows are an optimization for - * the common case of in-order events and {@link Sessions session window} by never associating - * state with windows that are created and immediately merged away.</li> - * <li><b>MERGED</b>: An ACTIVE window has been merged into another ACTIVE window after it had - * state associated with it. The window will thus appear as a state address window for exactly - * one ACTIVE window.</li> - * <li><b>EXPIRED</b>: The window has expired and may have been garbage collected. No new elements - * (even late elements) will ever be assigned to that window. These windows are not explicitly - * represented anywhere; it is expected that the user of {@link ActiveWindowSet} will store - * no state associated with the window.</li> + * <li><b>ACTIVE</b>: A window that has state associated with it and has not itself been merged + * away. The window may have one (or more) state address windows under which its + * non-empty state is stored. A state value for an ACTIVE window must be derived by reading + * the state in (all of) its state address windows. Note that only pre 1.4 pipelines + * use multiple state address windows per active window. From 1.4 onwards we eagerly merge + * window state into a single state address window. + * <li><b>NEW</b>: The initial state for a window of an incoming element which is not + * already ACTIVE. We have not yet called {@link WindowFn#mergeWindows}, and so don't yet know + * whether the window will be be merged into another NEW or ACTIVE window, or will + * become an ACTIVE window in its own right. * </ol> * - * <p> - * * <p>If no windows will ever be merged we can use the trivial implementation {@link * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this data structure is in * {@link MergingActiveWindowSet}. @@ -78,27 +73,25 @@ public interface ActiveWindowSet<W extends BoundedWindow> { /** * Called when windows are about to be merged, but before any {@link #onMerge} callback * has been made. + * + * @param toBeMerged the windows about to be merged. + * @param mergeResult the result window, either a member of {@code toBeMerged} or new. */ - void prefetchOnMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult) - throws Exception; + void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception; /** * Called when windows are about to be merged, after all {@link #prefetchOnMerge} calls * have been made, but before the active window set has been updated to reflect the merge. * - * @param toBeMerged the windows about to be merged. - * @param activeToBeMerged the subset of {@code toBeMerged} corresponding to windows which - * are currently ACTIVE (and about to be merged). The remaining windows have been deemed - * EPHEMERAL, and thus have no state associated with them. + * @param toBeMerged the windows about to be merged. * @param mergeResult the result window, either a member of {@code toBeMerged} or new. */ - void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult) - throws Exception; + void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception; } /** - * Remove EPHEMERAL windows and remaining NEW windows since we only need to know about them - * while processing new elements. + * Remove any remaining NEW windows since they were not promoted to being ACTIVE + * by {@link #ensureWindowIsActive} and we don't need to record anything about them. */ void cleanupTemporaryWindows(); @@ -108,42 +101,40 @@ public interface ActiveWindowSet<W extends BoundedWindow> { void persist(); /** - * Return the ACTIVE window into which {@code window} has been merged. - * Return {@code window} itself if it is ACTIVE. Return null if {@code window} has not - * yet been seen. + * Return (a view of) the set of currently ACTIVE and NEW windows. */ - @Nullable - W mergeResultWindow(W window); + Set<W> getActiveAndNewWindows(); /** - * Return (a view of) the set of currently ACTIVE windows. + * Return {@code true} if {@code window} is ACTIVE. */ - Set<W> getActiveWindows(); + boolean isActive(W window); /** - * Return {@code true} if {@code window} is ACTIVE. + * Return {@code true} if {@code window} is ACTIVE or NEW. */ - boolean isActive(W window); + boolean isActiveOrNew(W window); /** * Called when an incoming element indicates it is a member of {@code window}, but before we * have started processing that element. If {@code window} is not already known to be ACTIVE, - * MERGED or EPHEMERAL then add it as NEW. + * then add it as NEW. */ void ensureWindowExists(W window); /** * Called when a NEW or ACTIVE window is now known to be ACTIVE. - * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only state address window). + * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only state address + * window). */ void ensureWindowIsActive(W window); /** - * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it - * as ACTIVE. + * If {@code window} is not already known to be ACTIVE then add it as ACTIVE. + * For testing only. */ @VisibleForTesting - void addActive(W window); + void addActiveForTesting(W window); /** * Remove {@code window} from the set. @@ -152,9 +143,9 @@ public interface ActiveWindowSet<W extends BoundedWindow> { /** * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated with this window set, - * merging as many of the active windows as possible. {@code mergeCallback} will be invoked for - * each group of windows that are merged. After this no NEW windows will remain, all merge - * result windows will be ACTIVE, and all windows which have been merged away will not be ACTIVE. + * merging as many of the NEW and ACTIVE windows as possible. {@code mergeCallback} will be + * invoked for each group of windows that are merged. After this all merge result windows will + * be ACTIVE, and all windows which have been merged away will be neither ACTIVE nor NEW. */ void merge(MergeCallback<W> mergeCallback) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java index 13c1e34..07e47aa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.state.ValueState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Collection; @@ -39,71 +40,30 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import javax.annotation.Nullable; /** * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations. - * <p> - * <p>The underlying notion of {@link MergingActiveWindowSet} is that of representing equivalence - * classes of merged windows as a mapping from the merged "super-window" to a set of - * <i>state address</i> windows in which some state has been persisted. The mapping need not - * contain EPHEMERAL windows, because they are created and merged without any persistent state. - * Each window must be a state address window for at most one window, so the mapping is - * invertible. - * <p> - * <p>The states of a non-expired window are treated as follows: - * <p> - * <ul> - * <li><b>NEW</b>: a NEW has an empty set of associated state address windows.</li> - * <li><b>ACTIVE</b>: an ACTIVE window will be associated with some nonempty set of state - * address windows. If the window has not merged, this will necessarily be the singleton set - * containing just itself, but it is not required that an ACTIVE window be amongst its - * state address windows.</li> - * <li><b>MERGED</b>: a MERGED window will be in the set of associated windows for some - * other window - that window is retrieved via {@link #mergeResultWindow} (this reverse - * association is implemented in O(1) time).</li> - * <li><b>EPHEMERAL</b>: EPHEMERAL windows are not persisted but are tracked transiently; - * an EPHEMERAL window must be registered with this {@link ActiveWindowSet} by a call - * to {@link #recordMerge} prior to any request for a {@link #mergeResultWindow}.</li> - * </ul> - * <p> - * <p>To illustrate why an ACTIVE window need not be amongst its own state address windows, - * consider two active windows W1 and W2 that are merged to form W12. Further writes may be - * applied to either of W1 or W2, since a read of W12 implies reading both of W12 and merging - * their results. Hence W12 need not have state directly associated with it. */ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> { private final WindowFn<Object, W> windowFn; /** * Map ACTIVE and NEW windows to their state address windows. Persisted. - */ - private final Map<W, Set<W>> activeWindowToStateAddressWindows; - - /** - * As above, but only for EPHEMERAL windows. Does not need to be persisted. - */ - private final Map<W, Set<W>> activeWindowToEphemeralWindows; - - /** - * A map from window to the ACTIVE window it has been merged into. Does not need to be persisted. - * <p> * <ul> - * <li>Key window may be ACTIVE, MERGED or EPHEMERAL. - * <li>ACTIVE windows map to themselves. - * <li>If W1 maps to W2 then W2 is in {@link #activeWindowToStateAddressWindows}. - * <li>If W1 = W2 then W1 is ACTIVE. If W1 is in the state address window set for W2 then W1 is - * MERGED. Otherwise W1 is EPHEMERAL. + * <li>A NEW window has the empty set as its value. + * <li>An ACTIVE window has its (typically singleton) set of state address windows as + * its value. * </ul> */ - private final Map<W, W> windowToActiveWindow; + private final Map<W, Set<W>> activeWindowToStateAddressWindows; /** * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit. - * <p> - * <p>Used to avoid writing to state if no changes have been made during the work unit. + * Used to avoid writing to state if no changes have been made during the work unit. */ private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows; @@ -115,42 +75,32 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> state) { this.windowFn = windowFn; - StateTag<Object, ValueState<Map<W, Set<W>>>> mergeTreeAddr = + StateTag<Object, ValueState<Map<W, Set<W>>>> tag = StateTags.makeSystemTagInternal(StateTags.value( "tree", MapCoder.of(windowFn.windowCoder(), SetCoder.of(windowFn.windowCoder())))); - valueState = state.state(StateNamespaces.global(), mergeTreeAddr); - // Little use trying to prefetch this state since the ReduceFnRunner is stymied until it is - // available. + valueState = state.state(StateNamespaces.global(), tag); + // Little use trying to prefetch this state since the ReduceFnRunner + // is stymied until it is available. activeWindowToStateAddressWindows = emptyIfNull(valueState.read()); - activeWindowToEphemeralWindows = new HashMap<>(); originalActiveWindowToStateAddressWindows = deepCopy(activeWindowToStateAddressWindows); - windowToActiveWindow = invert(activeWindowToStateAddressWindows); } @Override public void cleanupTemporaryWindows() { - // All NEW windows can be forgotten. - Iterator<Map.Entry<W, Set<W>>> iter = - activeWindowToStateAddressWindows.entrySet().iterator(); + // All NEW windows can be forgotten since they must have ended up being merged into + // some other ACTIVE window. + Iterator<Map.Entry<W, Set<W>>> iter = activeWindowToStateAddressWindows.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<W, Set<W>> entry = iter.next(); if (entry.getValue().isEmpty()) { - windowToActiveWindow.remove(entry.getKey()); iter.remove(); } } - - // All EPHEMERAL windows can be forgotten. - for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) { - for (W ephemeral : entry.getValue()) { - windowToActiveWindow.remove(ephemeral); - } - } - activeWindowToEphemeralWindows.clear(); } @Override public void persist() { + checkInvariants(); if (activeWindowToStateAddressWindows.isEmpty()) { // Force all persistent state to disappear. valueState.clear(); @@ -160,42 +110,32 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi // No change. return; } - // All NEW windows must have been accounted for. - for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) { - Preconditions.checkState( - !entry.getValue().isEmpty(), "Cannot persist NEW window %s", entry.getKey()); - } - // Should be no EPHEMERAL windows. - Preconditions.checkState( - activeWindowToEphemeralWindows.isEmpty(), "Unexpected EPHEMERAL windows before persist"); - valueState.write(activeWindowToStateAddressWindows); // No need to update originalActiveWindowToStateAddressWindows since this object is about to // become garbage. } @Override - @Nullable - public W mergeResultWindow(W window) { - return windowToActiveWindow.get(window); + public Set<W> getActiveAndNewWindows() { + return activeWindowToStateAddressWindows.keySet(); } @Override - public Set<W> getActiveWindows() { - return activeWindowToStateAddressWindows.keySet(); + public boolean isActive(W window) { + Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); + return stateAddressWindows != null && !stateAddressWindows.isEmpty(); } @Override - public boolean isActive(W window) { + public boolean isActiveOrNew(W window) { return activeWindowToStateAddressWindows.containsKey(window); } @Override public void ensureWindowExists(W window) { - if (!windowToActiveWindow.containsKey(window)) { - Preconditions.checkState(!activeWindowToStateAddressWindows.containsKey(window)); + if (!activeWindowToStateAddressWindows.containsKey(window)) { + // Add window as NEW. activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>()); - windowToActiveWindow.put(window, window); } } @@ -203,48 +143,40 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi public void ensureWindowIsActive(W window) { Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); Preconditions.checkState(stateAddressWindows != null, - "Cannot ensure window %s is active since it is neither ACTIVE nor NEW", - window); + "Cannot ensure window %s is active since it is neither ACTIVE nor NEW", + window); if (stateAddressWindows.isEmpty()) { - // Window was NEW, make it ACTIVE. - Preconditions.checkState(windowToActiveWindow.containsKey(window) - && windowToActiveWindow.get(window).equals(window)); + // Window was NEW, make it ACTIVE with itself as its state address window. stateAddressWindows.add(window); } } @Override @VisibleForTesting - public void addActive(W window) { - if (!windowToActiveWindow.containsKey(window)) { + public void addActiveForTesting(W window) { + if (!activeWindowToStateAddressWindows.containsKey(window)) { + // Make window ACTIVE with itself as its state address window. Set<W> stateAddressWindows = new LinkedHashSet<>(); stateAddressWindows.add(window); activeWindowToStateAddressWindows.put(window, stateAddressWindows); - windowToActiveWindow.put(window, window); + } + } + + @VisibleForTesting + public void addActiveForTesting(W window, Iterable<W> stateAddressWindows) { + if (!activeWindowToStateAddressWindows.containsKey(window)) { + activeWindowToStateAddressWindows.put(window, Sets.newLinkedHashSet(stateAddressWindows)); } } @Override public void remove(W window) { - Set<W> stateAddressWindows = activeWindowToStateAddressWindows.remove(window); - if (stateAddressWindows != null) { - for (W stateAddressWindow : stateAddressWindows) { - windowToActiveWindow.remove(stateAddressWindow); - } - } - Set<W> ephemeralWindows = activeWindowToEphemeralWindows.remove(window); - if (ephemeralWindows != null) { - for (W ephemeralWindow : ephemeralWindows) { - windowToActiveWindow.remove(ephemeralWindow); - } - } - windowToActiveWindow.remove(window); + activeWindowToStateAddressWindows.remove(window); } private class MergeContextImpl extends WindowFn<Object, W>.MergeContext { private MergeCallback<W> mergeCallback; private final List<Collection<W>> allToBeMerged; - private final List<Collection<W>> allActiveToBeMerged; private final List<W> allMergeResults; private final Set<W> seen; @@ -252,7 +184,6 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi windowFn.super(); this.mergeCallback = mergeCallback; allToBeMerged = new ArrayList<>(); - allActiveToBeMerged = new ArrayList<>(); allMergeResults = new ArrayList<>(); seen = new HashSet<>(); } @@ -268,12 +199,11 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi Preconditions.checkNotNull(toBeMerged); Preconditions.checkNotNull(mergeResult); List<W> copyOfToBeMerged = new ArrayList<>(toBeMerged.size()); - List<W> activeToBeMerged = new ArrayList<>(toBeMerged.size()); boolean includesMergeResult = false; for (W window : toBeMerged) { Preconditions.checkNotNull(window); Preconditions.checkState( - isActive(window), "Expecting merge window %s to be active", window); + isActiveOrNew(window), "Expecting merge window %s to be ACTIVE or NEW", window); if (window.equals(mergeResult)) { includesMergeResult = true; } @@ -281,31 +211,24 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi Preconditions.checkState( notDup, "Expecting merge window %s to appear in at most one merge set", window); copyOfToBeMerged.add(window); - if (!activeWindowToStateAddressWindows.get(window).isEmpty()) { - activeToBeMerged.add(window); - } } if (!includesMergeResult) { Preconditions.checkState( - !isActive(mergeResult), "Expecting result window %s to be new", mergeResult); + !isActive(mergeResult), "Expecting result window %s to be NEW", mergeResult); } allToBeMerged.add(copyOfToBeMerged); - allActiveToBeMerged.add(activeToBeMerged); allMergeResults.add(mergeResult); } public void recordMerges() throws Exception { for (int i = 0; i < allToBeMerged.size(); i++) { - mergeCallback.prefetchOnMerge( - allToBeMerged.get(i), allActiveToBeMerged.get(i), allMergeResults.get(i)); + mergeCallback.prefetchOnMerge(allToBeMerged.get(i), allMergeResults.get(i)); } for (int i = 0; i < allToBeMerged.size(); i++) { - mergeCallback.onMerge( - allToBeMerged.get(i), allActiveToBeMerged.get(i), allMergeResults.get(i)); + mergeCallback.onMerge(allToBeMerged.get(i), allMergeResults.get(i)); recordMerge(allToBeMerged.get(i), allMergeResults.get(i)); } allToBeMerged.clear(); - allActiveToBeMerged.clear(); allMergeResults.clear(); seen.clear(); } @@ -330,6 +253,11 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi * the active window set. */ private void recordMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { + // Note that mergedWriteStateAddress must predict the result of writeStateAddress + // after the corresponding merge has been applied. + // Thus we must ensure the first state address window in the merged result here is + // deterministic. + // Thus we use a linked hash set. Set<W> newStateAddressWindows = new LinkedHashSet<>(); Set<W> existingStateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult); if (existingStateAddressWindows != null) { @@ -337,70 +265,35 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi newStateAddressWindows.addAll(existingStateAddressWindows); } - Set<W> newEphemeralWindows = new HashSet<>(); - Set<W> existingEphemeralWindows = activeWindowToEphemeralWindows.get(mergeResult); - if (existingEphemeralWindows != null) { - // Preserve all the existing EPHEMERAL windows for meregResult. - newEphemeralWindows.addAll(existingEphemeralWindows); - } - for (W other : toBeMerged) { Set<W> otherStateAddressWindows = activeWindowToStateAddressWindows.get(other); - Preconditions.checkState(otherStateAddressWindows != null, "Window %s is not ACTIVE", other); + Preconditions.checkState(otherStateAddressWindows != null, + "Window %s is not ACTIVE or NEW", other); for (W otherStateAddressWindow : otherStateAddressWindows) { // Since otherTarget equiv other AND other equiv mergeResult // THEN otherTarget equiv mergeResult. newStateAddressWindows.add(otherStateAddressWindow); - windowToActiveWindow.put(otherStateAddressWindow, mergeResult); } activeWindowToStateAddressWindows.remove(other); - Set<W> otherEphemeralWindows = activeWindowToEphemeralWindows.get(other); - if (otherEphemeralWindows != null) { - for (W otherEphemeral : otherEphemeralWindows) { - // Since otherEphemeral equiv other AND other equiv mergeResult - // THEN otherEphemeral equiv mergeResult. - newEphemeralWindows.add(otherEphemeral); - windowToActiveWindow.put(otherEphemeral, mergeResult); - } - } - activeWindowToEphemeralWindows.remove(other); - // Now other equiv mergeResult. - if (otherStateAddressWindows.contains(other)) { - // Other was ACTIVE and is now known to be MERGED. - } else if (otherStateAddressWindows.isEmpty()) { - // Other was NEW thus has no state. It is now EPHEMERAL. - newEphemeralWindows.add(other); - } else if (other.equals(mergeResult)) { - // Other was ACTIVE, was never used to store elements, but is still ACTIVE. - // Leave it as active. - } else { - // Other was ACTIVE, was never used to store element, as is no longer considered ACTIVE. - // It is now EPHEMERAL. - newEphemeralWindows.add(other); - } - windowToActiveWindow.put(other, mergeResult); } if (newStateAddressWindows.isEmpty()) { // If stateAddressWindows is empty then toBeMerged must have only contained EPHEMERAL windows. - // Promote mergeResult to be active now. + // Promote mergeResult to be ACTIVE now. newStateAddressWindows.add(mergeResult); } - windowToActiveWindow.put(mergeResult, mergeResult); activeWindowToStateAddressWindows.put(mergeResult, newStateAddressWindows); - if (!newEphemeralWindows.isEmpty()) { - activeWindowToEphemeralWindows.put(mergeResult, newEphemeralWindows); - } merged(mergeResult); } @Override public void merged(W window) { + // Take just the first state address window. Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); Preconditions.checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window); W first = Iterables.getFirst(stateAddressWindows, null); @@ -410,8 +303,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi /** * Return the state address windows for ACTIVE {@code window} from which all state associated - * should - * be read and merged. + * should be read and merged. */ @Override public Set<W> readStateAddresses(W window) { @@ -454,36 +346,14 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) { W active = entry.getKey(); Preconditions.checkState(!entry.getValue().isEmpty(), - "Unexpected empty state address window set for ACTIVE window %s", - active); + "Unexpected empty state address window set for ACTIVE window %s", + active); for (W stateAddressWindow : entry.getValue()) { Preconditions.checkState(knownStateAddressWindows.add(stateAddressWindow), - "%s is in more than one state address window set", - stateAddressWindow); - Preconditions.checkState(active.equals(windowToActiveWindow.get(stateAddressWindow)), - "%s should have %s as its ACTIVE window", stateAddressWindow, - active); + "%s is in more than one state address window set", + stateAddressWindow); } } - for (Map.Entry<W, Set<W>> entry : activeWindowToEphemeralWindows.entrySet()) { - W active = entry.getKey(); - Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(active), - "%s must be ACTIVE window", active); - Preconditions.checkState( - !entry.getValue().isEmpty(), "Unexpected empty EPHEMERAL set for %s", active); - for (W ephemeralWindow : entry.getValue()) { - Preconditions.checkState(knownStateAddressWindows.add(ephemeralWindow), - "%s is EPHEMERAL/state address of more than one ACTIVE window", - ephemeralWindow); - Preconditions.checkState(active.equals(windowToActiveWindow.get(ephemeralWindow)), - "%s should have %s as its ACTIVE window", ephemeralWindow, active); - } - } - for (Map.Entry<W, W> entry : windowToActiveWindow.entrySet()) { - Preconditions.checkState(activeWindowToStateAddressWindows.containsKey(entry.getValue()), - "%s should be ACTIVE since mergeResultWindow for %s", - entry.getValue(), entry.getKey()); - } } @Override @@ -502,23 +372,9 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi sb.append(active); sb.append(":\n"); for (W stateAddressWindow : stateAddressWindows) { - if (stateAddressWindow.equals(active)) { - sb.append(" ACTIVE "); - } else { - sb.append(" MERGED "); - } + sb.append(" "); sb.append(stateAddressWindow); sb.append("\n"); - W active2 = windowToActiveWindow.get(stateAddressWindow); - Preconditions.checkState(active2.equals(active)); - } - Set<W> ephemeralWindows = activeWindowToEphemeralWindows.get(active); - if (ephemeralWindows != null) { - for (W ephemeralWindow : ephemeralWindows) { - sb.append(" EPHEMERAL "); - sb.append(ephemeralWindow); - sb.append('\n'); - } } } } @@ -526,10 +382,26 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi return sb.toString(); } - // ====================================================================== + @Override + public boolean equals(Object o) { + if (!(o instanceof MergingActiveWindowSet)) { + return false; + } + + @SuppressWarnings("unchecked") + MergingActiveWindowSet<W> other = (MergingActiveWindowSet<W>) o; + + return activeWindowToStateAddressWindows.equals(other.activeWindowToStateAddressWindows); + } + + @Override + public int hashCode() { + return Objects.hashCode(activeWindowToStateAddressWindows); + } /** - * Replace null {@code multimap} with empty map, and replace null entries in {@code multimap} with + * Replace null {@code multimap} with empty map, and replace null entries in {@code multimap} + * with * empty sets. */ private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> multimap) { @@ -555,22 +427,4 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi } return newMultimap; } - - /** - * Return inversion of {@code multimap}, which must be invertible. - */ - private static <W> Map<W, W> invert(Map<W, Set<W>> multimap) { - Map<W, W> result = new HashMap<>(); - for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) { - W active = entry.getKey(); - for (W target : entry.getValue()) { - W previous = result.put(target, active); - Preconditions.checkState( - previous == null, - "Multimap is not invertible: Window %s has both %s and %s as representatives", - target, previous, active); - } - } - return result; - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java index 0d02302..15a4ebe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java @@ -27,7 +27,8 @@ import java.util.Collection; import java.util.Set; /** - * Implementation of {@link ActiveWindowSet} used with {@link WindowFn WindowFns} that don't support + * Implementation of {@link ActiveWindowSet} used with {@link WindowFn WindowFns} that don't + * support * merging. * * @param <W> the types of windows being managed @@ -40,13 +41,7 @@ public class NonMergingActiveWindowSet<W extends BoundedWindow> implements Activ public void persist() {} @Override - public W mergeResultWindow(W window) { - // Always represented by itself. - return window; - } - - @Override - public Set<W> getActiveWindows() { + public Set<W> getActiveAndNewWindows() { // Only supported when merging. throw new java.lang.UnsupportedOperationException(); } @@ -58,6 +53,11 @@ public class NonMergingActiveWindowSet<W extends BoundedWindow> implements Activ } @Override + public boolean isActiveOrNew(W window) { + return true; + } + + @Override public void ensureWindowExists(W window) {} @Override @@ -65,7 +65,7 @@ public class NonMergingActiveWindowSet<W extends BoundedWindow> implements Activ @Override @VisibleForTesting - public void addActive(W window) {} + public void addActiveForTesting(W window) {} @Override public void remove(W window) {} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index fe53389..e916aa8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.values.PCollection; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.joda.time.Duration; import org.joda.time.Instant; @@ -47,8 +48,10 @@ import org.joda.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import javax.annotation.Nullable; @@ -61,21 +64,21 @@ import javax.annotation.Nullable; * the triggering logic. The {@code ReduceFnRunner}s responsibilities are: * * <ul> - * <li>Tracking the windows that are active (have buffered data) as elements arrive and - * triggers are fired. - * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it - * when the trigger fires. - * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer - * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions - * such as output. - * <li>Scheduling garbage collection of state associated with a specific window, and making that - * happen when the appropriate timer fires. + * <li>Tracking the windows that are active (have buffered data) as elements arrive and + * triggers are fired. + * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it + * when the trigger fires. + * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer + * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions + * such as output. + * <li>Scheduling garbage collection of state associated with a specific window, and making that + * happen when the appropriate timer fires. * </ul> * - * @param <K> The type of key being processed. - * @param <InputT> The type of values associated with the key. + * @param <K> The type of key being processed. + * @param <InputT> The type of values associated with the key. * @param <OutputT> The output type that will be produced for each key. - * @param <W> The type of windows this operates on. + * @param <W> The type of windows this operates on. */ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { @@ -83,14 +86,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}. * * <ul> - * <li>It runs the trigger from the {@link WindowingStrategy}.</li> - * <li>It merges windows according to the {@link WindowingStrategy}.</li> - * <li>It chooses how to track active windows and clear out expired windows - * according to the {@link WindowingStrategy}, based on the allowed lateness and - * whether windows can merge.</li> - * <li>It decides whether to emit empty final panes according to whether the - * {@link WindowingStrategy} requires it.<li> - * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li> + * <li>It runs the trigger from the {@link WindowingStrategy}.</li> + * <li>It merges windows according to the {@link WindowingStrategy}.</li> + * <li>It chooses how to track active windows and clear out expired windows + * according to the {@link WindowingStrategy}, based on the allowed lateness and + * whether windows can merge.</li> + * <li>It decides whether to emit empty final panes according to whether the + * {@link WindowingStrategy} requires it.<li> + * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li> * </ul> */ private final WindowingStrategy<Object, W> windowingStrategy; @@ -103,11 +106,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { private final K key; - private final OnMergeCallback onMergeCallback = new OnMergeCallback(); - /** - * Track which windows are still active and which 'state address' windows contain state - * for a merged window. + * Track which windows are still active and the 'state address' windows which hold their state. * * <ul> * <li>State: Global map for all active windows for this computation and key. @@ -251,7 +251,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { @VisibleForTesting boolean hasNoActiveWindows() { - return activeWindows.getActiveWindows().isEmpty(); + return activeWindows.getActiveAndNewWindows().isEmpty(); } /** @@ -260,34 +260,34 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { * * <p>The general strategy is: * <ol> - * <li>Use {@link WindowedValue#getWindows} (itself determined using - * {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some - * of those windows will already have state associated with them. The rest are considered - * NEW. - * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows. - * Each NEW window will become either ACTIVE, MERGED, or EPHEMERAL. (See {@link - * ActiveWindowSet} for definitions of these terms.) - * <li>If at all possible, eagerly substitute EPHEMERAL windows with their ACTIVE state address - * windows before any state is associated with the EPHEMERAL window. In the common case that - * windows for new elements are merged into existing ACTIVE windows then no additional - * storage or merging overhead will be incurred. - * <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their - * states can be merged on-demand when a pane fires. - * <li>Process the element for each of the windows it's windows have been merged into according - * to {@link ActiveWindowSet}. Processing may require running triggers, setting timers, - * setting holds, and invoking {@link ReduceFn#onTrigger}. + * <li>Use {@link WindowedValue#getWindows} (itself determined using + * {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some + * of those windows will already have state associated with them. The rest are considered + * NEW. + * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows. + * Each NEW window will become either ACTIVE or be discardedL. + * (See {@link ActiveWindowSet} for definitions of these terms.) + * <li>If at all possible, eagerly substitute NEW windows with their ACTIVE state address + * windows before any state is associated with the NEW window. In the common case that + * windows for new elements are merged into existing ACTIVE windows then no additional + * storage or merging overhead will be incurred. + * <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their + * states can be merged on-demand when a pane fires. + * <li>Process the element for each of the windows it's windows have been merged into according + * to {@link ActiveWindowSet}. Processing may require running triggers, setting timers, + * setting holds, and invoking {@link ReduceFn#onTrigger}. * </ol> */ public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception { // If an incoming element introduces a new window, attempt to merge it into an existing - // window eagerly. The outcome is stored in the ActiveWindowSet. - collectAndMergeWindows(values); + // window eagerly. + Map<W, W> windowToMergeResult = collectAndMergeWindows(values); Set<W> windowsToConsider = new HashSet<>(); // Process each element, using the updated activeWindows determined by collectAndMergeWindows. for (WindowedValue<InputT> value : values) { - windowsToConsider.addAll(processElement(value)); + windowsToConsider.addAll(processElement(windowToMergeResult, value)); } // Trigger output from any window for which the trigger is ready @@ -303,8 +303,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // We're all done with merging and emitting elements so can compress the activeWindow state. // Any windows which are still NEW must have come in on a new element which was then discarded // due to the window's trigger being closed. We can thus delete them. - // Any windows which are EPHEMERAL must have come in on a new element but been merged away - // into some other ACTIVE window. We can thus also delete them. activeWindows.cleanupTemporaryWindows(); } @@ -313,12 +311,15 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } /** - * Extract the windows associated with the values, and invoke merge. + * Extract the windows associated with the values, and invoke merge. Return a map + * from windows to the merge result window. If a window is not in the domain of + * the result map then it did not get merged into a different window. */ - private void collectAndMergeWindows(Iterable<WindowedValue<InputT>> values) throws Exception { + private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values) + throws Exception { // No-op if no merging can take place if (windowingStrategy.getWindowFn().isNonMerging()) { - return; + return ImmutableMap.of(); } // Collect the windows from all elements (except those which are too late) and @@ -329,6 +330,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { W window = (W) untypedWindow; // For backwards compat with pre 1.4 only. + // We may still have ACTIVE windows with multiple state addresses, representing + // a window who's state has not yet been eagerly merged. + // We'll go ahead and merge that state now so that we don't have to worry about + // this legacy case anywhere else. if (activeWindows.isActive(window)) { Set<W> stateAddressWindows = activeWindows.readStateAddresses(window); if (stateAddressWindows.size() > 1) { @@ -342,19 +347,41 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } } - // Add this window as NEW if it is not currently ACTIVE or MERGED. + // Add this window as NEW if it is not currently ACTIVE. // If we had already seen this window and closed its trigger, then the - // window will not be ACTIVE or MERGED. It will then be added as NEW here, + // window will not be currently ACTIVE. It will then be added as NEW here, // and fall into the merging logic as usual. activeWindows.ensureWindowExists(window); } } // Merge all of the active windows and retain a mapping from source windows to result windows. - mergeActiveWindows(); + Map<W, W> windowToMergeResult = new HashMap<>(); + activeWindows.merge(new OnMergeCallback(windowToMergeResult)); + return windowToMergeResult; } private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> { + private final Map<W, W> windowToMergeResult; + + OnMergeCallback(Map<W, W> windowToMergeResult) { + this.windowToMergeResult = windowToMergeResult; + } + + /** + * Return the subset of {@code windows} which are currently ACTIVE. We only need to worry + * about merging state from ACTIVE windows. NEW windows by definition have no existing state. + */ + private List<W> activeWindows(Iterable<W> windows) { + List<W> active = new ArrayList<>(); + for (W window : windows) { + if (activeWindows.isActive(window)) { + active.add(window); + } + } + return active; + } + /** * Called from the active window set to indicate {@code toBeMerged} (of which only * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) will later @@ -362,7 +389,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { */ @Override public void prefetchOnMerge( - Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult) throws Exception { + Collection<W> toBeMerged, W mergeResult) throws Exception { + List<W> activeToBeMerged = activeWindows(toBeMerged); ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext = contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT); ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext = @@ -381,9 +409,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { * to be merged into {@code mergeResult}. */ @Override - public void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult) - throws Exception { + public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { + // Remember we have merged these windows. + for (W window : toBeMerged) { + windowToMergeResult.put(window, mergeResult); + } + // At this point activeWindows has NOT incorporated the results of the merge. + List<W> activeToBeMerged = activeWindows(toBeMerged); ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext = contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT); ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext = @@ -407,7 +440,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // Not merged away. continue; } - // Cleanup flavor A: Currently ACTIVE window is about to become MERGED. + // Cleanup flavor A: Currently ACTIVE window is about to be merged away. // Clear any state not already cleared by the onMerge calls above. WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult); ReduceFn<K, InputT, OutputT, W>.Context directClearContext = @@ -424,17 +457,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } } - private void mergeActiveWindows() throws Exception { - activeWindows.merge(onMergeCallback); - } - /** * Process an element. - * @param value the value being processed * + * @param value the value being processed * @return the set of windows in which the element was actually processed */ - private Collection<W> processElement(WindowedValue<InputT> value) throws Exception { + private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value) + throws Exception { // Redirect element windows to the ACTIVE windows they have been merged into. // The compressed representation (value, {window1, window2, ...}) actually represents // distinct elements (value, window1), (value, window2), ... @@ -444,12 +474,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { for (BoundedWindow untypedWindow : value.getWindows()) { @SuppressWarnings("unchecked") W window = (W) untypedWindow; - - ReduceFn<K, InputT, OutputT, W>.Context directContext = - contextFactory.base(window, StateStyle.DIRECT); - W active = activeWindows.mergeResultWindow(window); - Preconditions.checkState(active != null, "Window %s has no mergeResultWindow", window); - windows.add(active); + W mergeResult = windowToMergeResult.get(window); + if (mergeResult == null) { + mergeResult = window; + } + windows.add(mergeResult); } // Prefetch in each of the windows if we're going to need to process triggers @@ -464,7 +493,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { for (W window : windows) { ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); - if (triggerRunner.isClosed(directContext.state())) { // This window has already been closed. droppedDueToClosedWindow.addValue(1L); @@ -478,9 +506,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { triggerableWindows.add(window); activeWindows.ensureWindowIsActive(window); - ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); + nonEmptyPanes.recordContent(renamedContext.state()); // Make sure we've scheduled the end-of-window or garbage collection timer for this window. @@ -544,10 +572,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // - If the window function does not support windowing then all windows will be considered // active. // So we must take conjunction of activeWindows and triggerRunner state. - boolean windowIsActive = + boolean windowIsActiveAndOpen = activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state()); - if (!windowIsActive) { + if (!windowIsActiveAndOpen) { WindowTracing.debug( "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); } @@ -568,7 +596,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - if (windowIsActive) { + if (windowIsActiveAndOpen) { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. @@ -577,14 +605,14 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // Cleanup flavor B: Clear all the remaining state for this window since we'll never // see elements for it again. - clearAllState(directContext, renamedContext, windowIsActive); + clearAllState(directContext, renamedContext, windowIsActiveAndOpen); } else { WindowTracing.debug( "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " + "inputWatermark:{}; outputWatermark:{}", key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - if (windowIsActive) { + if (windowIsActiveAndOpen) { emitIfAppropriate(directContext, renamedContext); } @@ -625,9 +653,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { private void clearAllState( ReduceFn<K, InputT, OutputT, W>.Context directContext, ReduceFn<K, InputT, OutputT, W>.Context renamedContext, - boolean windowIsActive) - throws Exception { - if (windowIsActive) { + boolean windowIsActiveAndOpen) + throws Exception { + if (windowIsActiveAndOpen) { // Since both the window is in the active window set AND the trigger was not yet closed, // it is possible we still have state. reduceFn.clearState(renamedContext); @@ -649,10 +677,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } } paneInfoTracker.clear(directContext.state()); - if (activeWindows.isActive(directContext.window())) { - // Don't need to track address state windows anymore. - activeWindows.remove(directContext.window()); - } + // Don't need to track address state windows anymore. + activeWindows.remove(directContext.window()); // We'll never need to test for the trigger being closed again. triggerRunner.clearFinished(directContext.state()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java index 7ab8322..84699d6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -39,6 +40,9 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * Test NonMergingActiveWindowSet. @@ -48,12 +52,17 @@ public class MergingActiveWindowSetTest { private Sessions windowFn; private StateInternals<String> state; private MergingActiveWindowSet<IntervalWindow> set; + private ActiveWindowSet.MergeCallback<IntervalWindow> callback; @Before - public void before() { + public void setup() { windowFn = Sessions.withGapDuration(Duration.millis(10)); state = InMemoryStateInternals.forKey("dummyKey"); set = new MergingActiveWindowSet<>(windowFn, state); + @SuppressWarnings("unchecked") + ActiveWindowSet.MergeCallback<IntervalWindow> + callback = mock(ActiveWindowSet.MergeCallback.class); + this.callback = callback; } @After @@ -63,42 +72,78 @@ public class MergingActiveWindowSetTest { windowFn = null; } - private void add(final long instant) { - final Object element = new Long(instant); - Sessions.AssignContext context = windowFn.new AssignContext() { - @Override - public Object element() { - return element; - } + private void add(long... instants) { + for (final long instant : instants) { + System.out.println("ADD " + instant); + Sessions.AssignContext context = windowFn.new AssignContext() { + @Override + public Object element() { + return (Object) instant; + } - @Override - public Instant timestamp() { - return new Instant(instant); - } + @Override + public Instant timestamp() { + return new Instant(instant); + } - @Override - public Collection<? extends BoundedWindow> windows() { - return ImmutableList.of(); - } - }; + @Override + public Collection<? extends BoundedWindow> windows() { + return ImmutableList.of(); + } + }; - for (IntervalWindow window : windowFn.assignWindows(context)) { - set.ensureWindowExists(window); + for (IntervalWindow window : windowFn.assignWindows(context)) { + set.ensureWindowExists(window); + } } } - private void merge(ActiveWindowSet.MergeCallback<IntervalWindow> callback) throws Exception { + private Map<IntervalWindow, IntervalWindow> merge( + List<IntervalWindow> toBeMerged, + IntervalWindow mergeResult) throws Exception { + IntervalWindow predictedPostMergeWriteStateAddress = + set.mergedWriteStateAddress(toBeMerged, mergeResult); + + System.out.println("BEFORE MERGE"); + System.out.println(set); + Map<IntervalWindow, IntervalWindow> map = new HashMap<>(); + for (IntervalWindow window : toBeMerged) { + System.out.println("WILL MERGE " + window + " INTO " + mergeResult); + map.put(window, mergeResult); + } + System.out.println("AFTER MERGE"); set.merge(callback); - for (IntervalWindow window : set.getActiveWindows()) { - set.ensureWindowIsActive(window); + verify(callback).onMerge(toBeMerged, mergeResult); + System.out.println(set); + + assertEquals(predictedPostMergeWriteStateAddress, set.writeStateAddress(mergeResult)); + + return map; + } + + private void activate(Map<IntervalWindow, IntervalWindow> map, long... instants) { + for (long instant : instants) { + IntervalWindow window = window(instant, 10); + IntervalWindow active = map.get(window); + if (active == null) { + active = window; + } + System.out.println("ACTIVATE " + active); + set.ensureWindowIsActive(active); } set.checkInvariants(); } - private void pruneAndPersist() { + private void cleanup() { + System.out.println("CLEANUP"); set.cleanupTemporaryWindows(); set.checkInvariants(); + System.out.println(set); set.persist(); + MergingActiveWindowSet<IntervalWindow> reloaded = + new MergingActiveWindowSet<>(windowFn, state); + reloaded.checkInvariants(); + assertEquals(set, reloaded); } private IntervalWindow window(long start, long size) { @@ -106,70 +151,115 @@ public class MergingActiveWindowSetTest { } @Test - public void test() throws Exception { - @SuppressWarnings("unchecked") - ActiveWindowSet.MergeCallback<IntervalWindow> callback = - mock(ActiveWindowSet.MergeCallback.class); - + public void testLifecycle() throws Exception { + // Step 1: New elements show up, introducing NEW windows which are partially merged. // NEW 1+10 // NEW 2+10 // NEW 15+10 // => // ACTIVE 1+11 (target 1+11) - // EPHEMERAL 1+10 -> 1+11 - // EPHEMERAL 2+10 -> 1+11 // ACTIVE 15+10 (target 15+10) - add(1); - add(2); - add(15); - merge(callback); - verify(callback).onMerge(ImmutableList.of(window(1, 10), window(2, 10)), - ImmutableList.<IntervalWindow>of(), window(1, 11)); - assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), set.getActiveWindows()); - assertEquals(window(1, 11), set.mergeResultWindow(window(1, 10))); - assertEquals(window(1, 11), set.mergeResultWindow(window(2, 10))); - assertEquals(window(1, 11), set.mergeResultWindow(window(1, 11))); - assertEquals(window(15, 10), set.mergeResultWindow(window(15, 10))); + add(1, 2, 15); + assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(15, 10)), + set.getActiveAndNewWindows()); + Map<IntervalWindow, IntervalWindow> map = + merge(ImmutableList.of(window(1, 10), window(2, 10)), + window(1, 11)); + activate(map, 1, 2, 15); + assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), set.getActiveAndNewWindows()); assertEquals( - ImmutableSet.<IntervalWindow>of(window(1, 11)), set.readStateAddresses(window(1, 11))); + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 11))); assertEquals( - ImmutableSet.<IntervalWindow>of(window(15, 10)), set.readStateAddresses(window(15, 10))); + ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 10))); + cleanup(); + // Step 2: Another element, merged into an existing ACTIVE window. // NEW 3+10 // => // ACTIVE 1+12 (target 1+11) - // EPHEMERAL 3+10 -> 1+12 // ACTIVE 15+10 (target 15+10) add(3); - merge(callback); - verify(callback).onMerge(ImmutableList.of(window(1, 11), window(3, 10)), - ImmutableList.<IntervalWindow>of(window(1, 11)), window(1, 12)); - assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), set.getActiveWindows()); - assertEquals(window(1, 12), set.mergeResultWindow(window(3, 10))); + assertEquals(ImmutableSet.of(window(3, 10), window(1, 11), window(15, 10)), + set.getActiveAndNewWindows()); + map = merge(ImmutableList.of(window(1, 11), window(3, 10)), + window(1, 12)); + activate(map, 3); + assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 12))); + assertEquals( + ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 10))); + cleanup(); + // Step 3: Another element, causing two ACTIVE windows to be merged. // NEW 8+10 // => - // ACTIVE 1+24 (target 1+11, 15+10) - // MERGED 1+11 -> 1+24 - // MERGED 15+10 -> 1+24 - // EPHEMERAL 1+12 -> 1+24 + // ACTIVE 1+24 (target 1+11) add(8); - merge(callback); - verify(callback).onMerge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)), - ImmutableList.<IntervalWindow>of(window(1, 12), window(15, 10)), window(1, 24)); - assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveWindows()); - assertEquals(window(1, 24), set.mergeResultWindow(window(1, 12))); - assertEquals(window(1, 24), set.mergeResultWindow(window(1, 11))); - assertEquals(window(1, 24), set.mergeResultWindow(window(15, 10))); + assertEquals(ImmutableSet.of(window(8, 10), window(1, 12), window(15, 10)), + set.getActiveAndNewWindows()); + map = merge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)), + window(1, 24)); + activate(map, 8); + assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); + cleanup(); + // Step 4: Another element, merged into an existing ACTIVE window. // NEW 9+10 // => - // ACTIVE 1+24 (target 1+11, 15+10) + // ACTIVE 1+24 (target 1+11) add(9); - merge(callback); - verify(callback).onMerge(ImmutableList.of(window(1, 24), window(9, 10)), - ImmutableList.<IntervalWindow>of(window(1, 24)), window(1, 24)); + assertEquals(ImmutableSet.of(window(9, 10), window(1, 24)), set.getActiveAndNewWindows()); + map = merge(ImmutableList.of(window(1, 24), window(9, 10)), + window(1, 24)); + activate(map, 9); + assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); + cleanup(); + + // Step 5: Another element reusing earlier window, merged into an existing ACTIVE window. + // NEW 1+10 + // => + // ACTIVE 1+24 (target 1+11) + add(1); + assertEquals(ImmutableSet.of(window(1, 10), window(1, 24)), set.getActiveAndNewWindows()); + map = merge(ImmutableList.of(window(1, 10), window(1, 24)), + window(1, 24)); + activate(map, 1); + assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); + cleanup(); + + // Step 6: Window is closed. + set.remove(window(1, 24)); + cleanup(); + assertTrue(set.getActiveAndNewWindows().isEmpty()); + } + + @Test + public void testLegacyState() { + // Pre 1.4 we merged window state lazily. + // Simulate loading an active window set with multiple state address windows. + set.addActiveForTesting(window(1, 12), + ImmutableList.of(window(1, 10), window(2, 10), window(3, 10))); + + + // Make sure we can detect and repair the state. + assertTrue(set.isActive(window(1, 12))); + assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(3, 10)), + set.readStateAddresses(window(1, 12))); + assertEquals(window(1, 10), + set.mergedWriteStateAddress( + ImmutableList.of(window(1, 10), window(2, 10), window(3, 10)), + window(1, 12))); + set.merged(window(1, 12)); + cleanup(); - pruneAndPersist(); + // For then on we are back to the eager case. + assertEquals(ImmutableSet.of(window(1, 10)), set.readStateAddresses(window(1, 12))); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d534ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index 0889b4f..a1e376e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -50,6 +50,7 @@ import com.google.common.collect.Maps; import org.joda.time.Duration; import org.joda.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -104,6 +105,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> { private final TriggerContextFactory<W> contextFactory; private final WindowFn<Object, W> windowFn; private final ActiveWindowSet<W> activeWindows; + private final Map<W, W> windowToMergeResult; /** * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link TriggerBuilder} @@ -155,6 +157,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> { windowFn.isNonMerging() ? new NonMergingActiveWindowSet<W>() : new MergingActiveWindowSet<W>(windowFn, stateInternals); + this.windowToMergeResult = new HashMap<>(); this.contextFactory = new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows); @@ -245,7 +248,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> { windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE))); for (W window : assignedWindows) { - activeWindows.addActive(window); + activeWindows.addActiveForTesting(window); // Today, triggers assume onTimer firing at the watermark time, whether or not they // explicitly set the timer themselves. So this tester must set it. @@ -263,7 +266,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> { for (BoundedWindow untypedWindow : windowedValue.getWindows()) { // SDK is responsible for type safety @SuppressWarnings("unchecked") - W window = activeWindows.mergeResultWindow((W) untypedWindow); + W window = mergeResult((W) untypedWindow); Trigger.OnElementContext context = contextFactory.createOnElementContext(window, new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(), @@ -312,14 +315,20 @@ public class TriggerTester<InputT, W extends BoundedWindow> { * since it is just to test the trigger's {@code OnMerge} method. */ public final void mergeWindows() throws Exception { + windowToMergeResult.clear(); activeWindows.merge(new MergeCallback<W>() { @Override - public void prefetchOnMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, - W mergeResult) throws Exception {} + public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {} @Override - public void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult) - throws Exception { + public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { + List<W> activeToBeMerged = new ArrayList<W>(); + for (W window : toBeMerged) { + windowToMergeResult.put(window, mergeResult); + if (activeWindows.isActive(window)) { + activeToBeMerged.add(window); + } + } Map<W, FinishedTriggers> mergingFinishedSets = Maps.newHashMapWithExpectedSize(activeToBeMerged.size()); for (W oldWindow : activeToBeMerged) { @@ -334,6 +343,11 @@ public class TriggerTester<InputT, W extends BoundedWindow> { }); } + public W mergeResult(W window) { + W result = windowToMergeResult.get(window); + return result == null ? window : result; + } + private FinishedTriggers getFinishedSet(W window) { FinishedTriggers finishedSet = finishedSets.get(window); if (finishedSet == null) {