Move ActiveWindowSet and implementations to runners-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bcf02986 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bcf02986 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bcf02986 Branch: refs/heads/master Commit: bcf02986df5d7831bb6fbe4c304bef6857e395f3 Parents: cc662d6 Author: Kenneth Knowles <k...@google.com> Authored: Mon Dec 12 20:18:02 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Dec 16 13:48:37 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/core/ActiveWindowSet.java | 173 ++++++++ .../runners/core/MergingActiveWindowSet.java | 428 +++++++++++++++++++ .../runners/core/NonMergingActiveWindowSet.java | 91 ++++ .../runners/core/ReduceFnContextFactory.java | 1 - .../beam/runners/core/ReduceFnRunner.java | 3 - .../TriggerStateMachineContextFactory.java | 2 +- .../core/MergingActiveWindowSetTest.java | 262 ++++++++++++ .../triggers/TriggerStateMachineTester.java | 8 +- .../apache/beam/sdk/util/ActiveWindowSet.java | 173 -------- .../beam/sdk/util/MergingActiveWindowSet.java | 428 ------------------- .../sdk/util/NonMergingActiveWindowSet.java | 91 ---- .../sdk/util/MergingActiveWindowSetTest.java | 262 ------------ 12 files changed, 959 insertions(+), 963 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/ActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ActiveWindowSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ActiveWindowSet.java new file mode 100644 index 0000000..79d1f3f --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ActiveWindowSet.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Collection; +import java.util.Set; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; + +/** + * Track which windows are <i>active</i>, and the <i>state address window(s)</i> under which their + * state is stored. Also help with the multi-step process of merging windows and their associated + * state. + * + * <p>When windows are merged we must also merge their state. For example, we may need to + * concatenate buffered elements, sum a count of elements, or find a new minimum timestamp. + * If we start with two windows {@code Wa} and {@code Wb} and later discover they should be + * merged into window {@code Wab} then, naively, we must copy and merge the states of {@code Wa} + * and {@code Wab} into {@code Wab}. + * + * <p>However, the common case for merging windows is for a new window to be merged into an + * existing window. Thus, if {@code Wa} is the existing window and {@code Wb} the new window it + * is more efficient to leave the state for {@code Wa} where it is, and simply redirect {@code + * Wab} to it. In this case we say {@code Wab} has a state address window of {@code Wa}. + * + * <p>Even if windows {@code Wa} and {@code Wb} already have state, it can still be more efficient + * to append the state of {@code Wb} onto {@code Wa} rather than copy the state from {@code Wa} + * and {@code Wb} into {@code Wab}. + * + * <p>We use the following terminology for windows: + * <ol> + * <li><b>ACTIVE</b>: A window that has state associated with it and has not itself been merged + * away. The window may have one (or more) state address windows under which its + * non-empty state is stored. A state value for an ACTIVE window must be derived by reading + * the state in (all of) its state address windows. Note that only pre 1.4 pipelines + * use multiple state address windows per active window. From 1.4 onwards we eagerly merge + * window state into a single state address window. + * <li><b>NEW</b>: The initial state for a window of an incoming element which is not + * already ACTIVE. We have not yet called {@link WindowFn#mergeWindows}, and so don't yet know + * whether the window will be be merged into another NEW or ACTIVE window, or will + * become an ACTIVE window in its own right. + * </ol> + * + * <p>If no windows will ever be merged we can use the trivial implementation {@link + * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this data structure is in + * {@link MergingActiveWindowSet}. + * + * @param <W> the type of window being managed + */ +public interface ActiveWindowSet<W extends BoundedWindow> { + /** + * Callback for {@link #merge}. + */ + public interface MergeCallback<W extends BoundedWindow> { + /** + * Called when windows are about to be merged, but before any {@link #onMerge} callback + * has been made. + * + * @param toBeMerged the windows about to be merged. + * @param mergeResult the result window, either a member of {@code toBeMerged} or new. + */ + void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception; + + /** + * Called when windows are about to be merged, after all {@link #prefetchOnMerge} calls + * have been made, but before the active window set has been updated to reflect the merge. + * + * @param toBeMerged the windows about to be merged. + * @param mergeResult the result window, either a member of {@code toBeMerged} or new. + */ + void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception; + } + + /** + * Remove any remaining NEW windows since they were not promoted to being ACTIVE + * by {@link #ensureWindowIsActive} and we don't need to record anything about them. + */ + void cleanupTemporaryWindows(); + + /** + * Save any state changes needed. + */ + void persist(); + + /** + * Return (a view of) the set of currently ACTIVE and NEW windows. + */ + Set<W> getActiveAndNewWindows(); + + /** + * Return {@code true} if {@code window} is ACTIVE. + */ + boolean isActive(W window); + + /** + * Return {@code true} if {@code window} is ACTIVE or NEW. + */ + boolean isActiveOrNew(W window); + + /** + * Called when an incoming element indicates it is a member of {@code window}, but before we + * have started processing that element. If {@code window} is not already known to be ACTIVE, + * then add it as NEW. + */ + void ensureWindowExists(W window); + + /** + * Called when a NEW or ACTIVE window is now known to be ACTIVE. + * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only state address + * window). + */ + void ensureWindowIsActive(W window); + + /** + * If {@code window} is not already known to be ACTIVE then add it as ACTIVE. + * For testing only. + */ + @VisibleForTesting + void addActiveForTesting(W window); + + /** + * Remove {@code window} from the set. + */ + void remove(W window); + + /** + * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated with this window set, + * merging as many of the NEW and ACTIVE windows as possible. {@code mergeCallback} will be + * invoked for each group of windows that are merged. After this all merge result windows will + * be ACTIVE, and all windows which have been merged away will be neither ACTIVE nor NEW. + */ + void merge(MergeCallback<W> mergeCallback) throws Exception; + + /** + * Signal that all state in {@link #readStateAddresses} for {@code window} has been merged into + * the {@link #writeStateAddress} for {@code window}. + */ + void merged(W window); + + /** + * Return the state address windows for ACTIVE {@code window} from which all state associated + * should be read and merged. + */ + Set<W> readStateAddresses(W window); + + /** + * Return the state address window of ACTIVE {@code window} into which all new state should be + * written. Always one of the results of {@link #readStateAddresses}. + */ + W writeStateAddress(W window); + + /** + * Return the state address window into which all new state should be written after + * ACTIVE windows {@code toBeMerged} have been merged into {@code mergeResult}. + */ + W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java new file mode 100644 index 0000000..720377a --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.SetCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; + +/** + * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations. + */ +public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> { + private final WindowFn<Object, W> windowFn; + + /** + * Map ACTIVE and NEW windows to their state address windows. Persisted. + * <ul> + * <li>A NEW window has the empty set as its value. + * <li>An ACTIVE window has its (typically singleton) set of state address windows as + * its value. + * </ul> + */ + private final Map<W, Set<W>> activeWindowToStateAddressWindows; + + /** + * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit. + * Used to avoid writing to state if no changes have been made during the work unit. + */ + private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows; + + /** + * Handle representing our state in the backend. + */ + private final ValueState<Map<W, Set<W>>> valueState; + + public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> state) { + this.windowFn = windowFn; + + StateTag<Object, ValueState<Map<W, Set<W>>>> tag = + StateTags.makeSystemTagInternal(StateTags.value( + "tree", MapCoder.of(windowFn.windowCoder(), SetCoder.of(windowFn.windowCoder())))); + valueState = state.state(StateNamespaces.global(), tag); + // Little use trying to prefetch this state since the ReduceFnRunner + // is stymied until it is available. + activeWindowToStateAddressWindows = emptyIfNull(valueState.read()); + originalActiveWindowToStateAddressWindows = deepCopy(activeWindowToStateAddressWindows); + } + + @Override + public void cleanupTemporaryWindows() { + // All NEW windows can be forgotten since they must have ended up being merged into + // some other ACTIVE window. + Iterator<Map.Entry<W, Set<W>>> iter = activeWindowToStateAddressWindows.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<W, Set<W>> entry = iter.next(); + if (entry.getValue().isEmpty()) { + iter.remove(); + } + } + } + + @Override + public void persist() { + checkInvariants(); + if (activeWindowToStateAddressWindows.isEmpty()) { + // Force all persistent state to disappear. + valueState.clear(); + return; + } + if (activeWindowToStateAddressWindows.equals(originalActiveWindowToStateAddressWindows)) { + // No change. + return; + } + valueState.write(activeWindowToStateAddressWindows); + // No need to update originalActiveWindowToStateAddressWindows since this object is about to + // become garbage. + } + + @Override + public Set<W> getActiveAndNewWindows() { + return activeWindowToStateAddressWindows.keySet(); + } + + @Override + public boolean isActive(W window) { + Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); + return stateAddressWindows != null && !stateAddressWindows.isEmpty(); + } + + @Override + public boolean isActiveOrNew(W window) { + return activeWindowToStateAddressWindows.containsKey(window); + } + + @Override + public void ensureWindowExists(W window) { + if (!activeWindowToStateAddressWindows.containsKey(window)) { + // Add window as NEW. + activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>()); + } + } + + @Override + public void ensureWindowIsActive(W window) { + Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); + checkState(stateAddressWindows != null, + "Cannot ensure window %s is active since it is neither ACTIVE nor NEW", + window); + if (stateAddressWindows != null && stateAddressWindows.isEmpty()) { + // Window was NEW, make it ACTIVE with itself as its state address window. + stateAddressWindows.add(window); + } + } + + @Override + @VisibleForTesting + public void addActiveForTesting(W window) { + if (!activeWindowToStateAddressWindows.containsKey(window)) { + // Make window ACTIVE with itself as its state address window. + Set<W> stateAddressWindows = new LinkedHashSet<>(); + stateAddressWindows.add(window); + activeWindowToStateAddressWindows.put(window, stateAddressWindows); + } + } + + @VisibleForTesting + public void addActiveForTesting(W window, Iterable<W> stateAddressWindows) { + if (!activeWindowToStateAddressWindows.containsKey(window)) { + activeWindowToStateAddressWindows.put(window, Sets.newLinkedHashSet(stateAddressWindows)); + } + } + + @Override + public void remove(W window) { + activeWindowToStateAddressWindows.remove(window); + } + + private class MergeContextImpl extends WindowFn<Object, W>.MergeContext { + private MergeCallback<W> mergeCallback; + private final List<Collection<W>> allToBeMerged; + private final List<W> allMergeResults; + private final Set<W> seen; + + public MergeContextImpl(MergeCallback<W> mergeCallback) { + windowFn.super(); + this.mergeCallback = mergeCallback; + allToBeMerged = new ArrayList<>(); + allMergeResults = new ArrayList<>(); + seen = new HashSet<>(); + } + + @Override + public Collection<W> windows() { + return activeWindowToStateAddressWindows.keySet(); + } + + @Override + public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception { + // The arguments have come from userland. + checkNotNull(toBeMerged); + checkNotNull(mergeResult); + List<W> copyOfToBeMerged = new ArrayList<>(toBeMerged.size()); + boolean includesMergeResult = false; + for (W window : toBeMerged) { + checkNotNull(window); + checkState(isActiveOrNew(window), "Expecting merge window %s to be ACTIVE or NEW", window); + if (window.equals(mergeResult)) { + includesMergeResult = true; + } + boolean notDup = seen.add(window); + checkState(notDup, "Expecting merge window %s to appear in at most one merge set", window); + copyOfToBeMerged.add(window); + } + if (!includesMergeResult) { + checkState(!isActive(mergeResult), "Expecting result window %s to be NEW", mergeResult); + } + allToBeMerged.add(copyOfToBeMerged); + allMergeResults.add(mergeResult); + } + + public void recordMerges() throws Exception { + for (int i = 0; i < allToBeMerged.size(); i++) { + mergeCallback.prefetchOnMerge(allToBeMerged.get(i), allMergeResults.get(i)); + } + for (int i = 0; i < allToBeMerged.size(); i++) { + mergeCallback.onMerge(allToBeMerged.get(i), allMergeResults.get(i)); + recordMerge(allToBeMerged.get(i), allMergeResults.get(i)); + } + allToBeMerged.clear(); + allMergeResults.clear(); + seen.clear(); + } + } + + @Override + public void merge(MergeCallback<W> mergeCallback) throws Exception { + MergeContextImpl context = new MergeContextImpl(mergeCallback); + + // See what the window function does with the NEW and already ACTIVE windows. + // Entering userland. + windowFn.mergeWindows(context); + + // Actually do the merging and invoke the callbacks. + context.recordMerges(); + } + + /** + * A {@link WindowFn#mergeWindows} call has determined that {@code toBeMerged} (which must + * all be ACTIVE}) should be considered equivalent to {@code activeWindow} (which is either a + * member of {@code toBeMerged} or is a new window). Make the corresponding change in + * the active window set. + */ + private void recordMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { + // Note that mergedWriteStateAddress must predict the result of writeStateAddress + // after the corresponding merge has been applied. + // Thus we must ensure the first state address window in the merged result here is + // deterministic. + // Thus we use a linked hash set. + Set<W> newStateAddressWindows = new LinkedHashSet<>(); + Set<W> existingStateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult); + if (existingStateAddressWindows != null) { + // Preserve all the existing state address windows for mergeResult. + newStateAddressWindows.addAll(existingStateAddressWindows); + } + + for (W other : toBeMerged) { + Set<W> otherStateAddressWindows = activeWindowToStateAddressWindows.get(other); + checkState(otherStateAddressWindows != null, + "Window %s is not ACTIVE or NEW", other); + + if (otherStateAddressWindows != null) { + for (W otherStateAddressWindow : otherStateAddressWindows) { + // Since otherTarget equiv other AND other equiv mergeResult + // THEN otherTarget equiv mergeResult. + newStateAddressWindows.add(otherStateAddressWindow); + } + } + activeWindowToStateAddressWindows.remove(other); + + // Now other equiv mergeResult. + } + + if (newStateAddressWindows.isEmpty()) { + // If stateAddressWindows is empty then toBeMerged must have only contained EPHEMERAL windows. + // Promote mergeResult to be ACTIVE now. + newStateAddressWindows.add(mergeResult); + } + + activeWindowToStateAddressWindows.put(mergeResult, newStateAddressWindows); + + merged(mergeResult); + } + + @Override + public void merged(W window) { + // Take just the first state address window. + Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); + checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window); + W first = Iterables.getFirst(stateAddressWindows, null); + stateAddressWindows.clear(); + stateAddressWindows.add(first); + } + + /** + * Return the state address windows for ACTIVE {@code window} from which all state associated + * should be read and merged. + */ + @Override + public Set<W> readStateAddresses(W window) { + Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); + checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window); + return stateAddressWindows; + } + + /** + * Return the state address window of ACTIVE {@code window} into which all new state should be + * written. + */ + @Override + public W writeStateAddress(W window) { + Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); + checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window); + W result = Iterables.getFirst(stateAddressWindows, null); + checkState(result != null, "Window %s is still NEW", window); + return result; + } + + @Override + public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) { + Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult); + if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) { + return Iterables.getFirst(stateAddressWindows, null); + } + for (W mergedWindow : toBeMerged) { + stateAddressWindows = activeWindowToStateAddressWindows.get(mergedWindow); + if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) { + return Iterables.getFirst(stateAddressWindows, null); + } + } + return mergeResult; + } + + @VisibleForTesting + public void checkInvariants() { + Set<W> knownStateAddressWindows = new HashSet<>(); + for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) { + W active = entry.getKey(); + checkState(!entry.getValue().isEmpty(), + "Unexpected empty state address window set for ACTIVE window %s", + active); + for (W stateAddressWindow : entry.getValue()) { + checkState(knownStateAddressWindows.add(stateAddressWindow), + "%s is in more than one state address window set", + stateAddressWindow); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("MergingActiveWindowSet {\n"); + for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) { + W active = entry.getKey(); + Set<W> stateAddressWindows = entry.getValue(); + if (stateAddressWindows.isEmpty()) { + sb.append(" NEW "); + sb.append(active); + sb.append('\n'); + } else { + sb.append(" ACTIVE "); + sb.append(active); + sb.append(":\n"); + for (W stateAddressWindow : stateAddressWindows) { + sb.append(" "); + sb.append(stateAddressWindow); + sb.append("\n"); + } + } + } + sb.append("}"); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MergingActiveWindowSet)) { + return false; + } + + @SuppressWarnings("unchecked") + MergingActiveWindowSet<W> other = (MergingActiveWindowSet<W>) o; + + return activeWindowToStateAddressWindows.equals(other.activeWindowToStateAddressWindows); + } + + @Override + public int hashCode() { + return Objects.hashCode(activeWindowToStateAddressWindows); + } + + /** + * Replace null {@code multimap} with empty map, and replace null entries in {@code multimap} + * with + * empty sets. + */ + private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> multimap) { + if (multimap == null) { + return new HashMap<>(); + } else { + for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) { + if (entry.getValue() == null) { + entry.setValue(new LinkedHashSet<W>()); + } + } + return multimap; + } + } + + /** + * Return a deep copy of {@code multimap}. + */ + private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) { + Map<W, Set<W>> newMultimap = new HashMap<>(); + for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) { + newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue())); + } + return newMultimap; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/NonMergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonMergingActiveWindowSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonMergingActiveWindowSet.java new file mode 100644 index 0000000..fec6c45 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonMergingActiveWindowSet.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import java.util.Collection; +import java.util.Set; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; + +/** + * Implementation of {@link ActiveWindowSet} used with {@link WindowFn WindowFns} that don't + * support + * merging. + * + * @param <W> the types of windows being managed + */ +public class NonMergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> { + @Override + public void cleanupTemporaryWindows() {} + + @Override + public void persist() {} + + @Override + public Set<W> getActiveAndNewWindows() { + // Only supported when merging. + throw new java.lang.UnsupportedOperationException(); + } + + @Override + public boolean isActive(W window) { + // Windows should never disappear, since we don't support merging. + return true; + } + + @Override + public boolean isActiveOrNew(W window) { + return true; + } + + @Override + public void ensureWindowExists(W window) {} + + @Override + public void ensureWindowIsActive(W window) {} + + @Override + @VisibleForTesting + public void addActiveForTesting(W window) {} + + @Override + public void remove(W window) {} + + @Override + public void merge(MergeCallback<W> mergeCallback) throws Exception {} + + @Override + public void merged(W window) {} + + @Override + public Set<W> readStateAddresses(W window) { + return ImmutableSet.of(window); + } + + @Override + public W writeStateAddress(W window) { + return window; + } + + @Override + public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) { + return mergeResult; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index c71897d..eae1a8b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.ActiveWindowSet; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 6f7bbcf..96e76b7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -50,9 +50,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ActiveWindowSet; -import org.apache.beam.sdk.util.MergingActiveWindowSet; -import org.apache.beam.sdk.util.NonMergingActiveWindowSet; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java index 1c06e8d..e3df4ee 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java @@ -24,12 +24,12 @@ import com.google.common.collect.Iterables; import java.util.Collection; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.ActiveWindowSet; import org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo; import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ActiveWindowSet; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.state.MergingStateAccessor; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java new file mode 100644 index 0000000..a4928e3 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test NonMergingActiveWindowSet. + */ +@RunWith(JUnit4.class) +public class MergingActiveWindowSetTest { + private Sessions windowFn; + private StateInternals<String> state; + private MergingActiveWindowSet<IntervalWindow> set; + private ActiveWindowSet.MergeCallback<IntervalWindow> callback; + + @Before + public void setup() { + windowFn = Sessions.withGapDuration(Duration.millis(10)); + state = InMemoryStateInternals.forKey("dummyKey"); + set = new MergingActiveWindowSet<>(windowFn, state); + @SuppressWarnings("unchecked") + ActiveWindowSet.MergeCallback<IntervalWindow> + callback = mock(ActiveWindowSet.MergeCallback.class); + this.callback = callback; + } + + @After + public void after() { + set = null; + state = null; + windowFn = null; + } + + private void add(long... instants) { + for (final long instant : instants) { + System.out.println("ADD " + instant); + Sessions.AssignContext context = windowFn.new AssignContext() { + @Override + public Object element() { + return (Object) instant; + } + + @Override + public Instant timestamp() { + return new Instant(instant); + } + + @Override + public BoundedWindow window() { + return GlobalWindow.INSTANCE; + } + }; + + for (IntervalWindow window : windowFn.assignWindows(context)) { + set.ensureWindowExists(window); + } + } + } + + private Map<IntervalWindow, IntervalWindow> merge( + List<IntervalWindow> toBeMerged, + IntervalWindow mergeResult) throws Exception { + IntervalWindow predictedPostMergeWriteStateAddress = + set.mergedWriteStateAddress(toBeMerged, mergeResult); + + System.out.println("BEFORE MERGE"); + System.out.println(set); + Map<IntervalWindow, IntervalWindow> map = new HashMap<>(); + for (IntervalWindow window : toBeMerged) { + System.out.println("WILL MERGE " + window + " INTO " + mergeResult); + map.put(window, mergeResult); + } + System.out.println("AFTER MERGE"); + set.merge(callback); + verify(callback).onMerge(toBeMerged, mergeResult); + System.out.println(set); + + assertEquals(predictedPostMergeWriteStateAddress, set.writeStateAddress(mergeResult)); + + return map; + } + + private void activate(Map<IntervalWindow, IntervalWindow> map, long... instants) { + for (long instant : instants) { + IntervalWindow window = window(instant, 10); + IntervalWindow active = map.get(window); + if (active == null) { + active = window; + } + System.out.println("ACTIVATE " + active); + set.ensureWindowIsActive(active); + } + set.checkInvariants(); + } + + private void cleanup() { + System.out.println("CLEANUP"); + set.cleanupTemporaryWindows(); + set.checkInvariants(); + System.out.println(set); + set.persist(); + MergingActiveWindowSet<IntervalWindow> reloaded = + new MergingActiveWindowSet<>(windowFn, state); + reloaded.checkInvariants(); + assertEquals(set, reloaded); + } + + private IntervalWindow window(long start, long size) { + return new IntervalWindow(new Instant(start), new Duration(size)); + } + + @Test + public void testLifecycle() throws Exception { + // Step 1: New elements show up, introducing NEW windows which are partially merged. + // NEW 1+10 + // NEW 2+10 + // NEW 15+10 + // => + // ACTIVE 1+11 (target 1+11) + // ACTIVE 15+10 (target 15+10) + add(1, 2, 15); + assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(15, 10)), + set.getActiveAndNewWindows()); + Map<IntervalWindow, IntervalWindow> map = + merge(ImmutableList.of(window(1, 10), window(2, 10)), + window(1, 11)); + activate(map, 1, 2, 15); + assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 11))); + assertEquals( + ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 10))); + cleanup(); + + // Step 2: Another element, merged into an existing ACTIVE window. + // NEW 3+10 + // => + // ACTIVE 1+12 (target 1+11) + // ACTIVE 15+10 (target 15+10) + add(3); + assertEquals(ImmutableSet.of(window(3, 10), window(1, 11), window(15, 10)), + set.getActiveAndNewWindows()); + map = merge(ImmutableList.of(window(1, 11), window(3, 10)), + window(1, 12)); + activate(map, 3); + assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 12))); + assertEquals( + ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 10))); + cleanup(); + + // Step 3: Another element, causing two ACTIVE windows to be merged. + // NEW 8+10 + // => + // ACTIVE 1+24 (target 1+11) + add(8); + assertEquals(ImmutableSet.of(window(8, 10), window(1, 12), window(15, 10)), + set.getActiveAndNewWindows()); + map = merge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)), + window(1, 24)); + activate(map, 8); + assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); + cleanup(); + + // Step 4: Another element, merged into an existing ACTIVE window. + // NEW 9+10 + // => + // ACTIVE 1+24 (target 1+11) + add(9); + assertEquals(ImmutableSet.of(window(9, 10), window(1, 24)), set.getActiveAndNewWindows()); + map = merge(ImmutableList.of(window(1, 24), window(9, 10)), + window(1, 24)); + activate(map, 9); + assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); + cleanup(); + + // Step 5: Another element reusing earlier window, merged into an existing ACTIVE window. + // NEW 1+10 + // => + // ACTIVE 1+24 (target 1+11) + add(1); + assertEquals(ImmutableSet.of(window(1, 10), window(1, 24)), set.getActiveAndNewWindows()); + map = merge(ImmutableList.of(window(1, 10), window(1, 24)), + window(1, 24)); + activate(map, 1); + assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); + assertEquals( + ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); + cleanup(); + + // Step 6: Window is closed. + set.remove(window(1, 24)); + cleanup(); + assertTrue(set.getActiveAndNewWindows().isEmpty()); + } + + @Test + public void testLegacyState() { + // Pre 1.4 we merged window state lazily. + // Simulate loading an active window set with multiple state address windows. + set.addActiveForTesting(window(1, 12), + ImmutableList.of(window(1, 10), window(2, 10), window(3, 10))); + + + // Make sure we can detect and repair the state. + assertTrue(set.isActive(window(1, 12))); + assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(3, 10)), + set.readStateAddresses(window(1, 12))); + assertEquals(window(1, 10), + set.mergedWriteStateAddress( + ImmutableList.of(window(1, 10), window(2, 10), window(3, 10)), + window(1, 12))); + set.merged(window(1, 12)); + cleanup(); + + // For then on we are back to the eager case. + assertEquals(ImmutableSet.of(window(1, 10)), set.readStateAddresses(window(1, 12))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 8969405..2a626d4 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -32,15 +32,15 @@ import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.runners.core.ActiveWindowSet; +import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback; import org.apache.beam.runners.core.InMemoryTimerInternals; +import org.apache.beam.runners.core.MergingActiveWindowSet; +import org.apache.beam.runners.core.NonMergingActiveWindowSet; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ActiveWindowSet; -import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; -import org.apache.beam.sdk.util.MergingActiveWindowSet; -import org.apache.beam.sdk.util.NonMergingActiveWindowSet; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.Timers; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java deleted file mode 100644 index 2e0af29..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.annotations.VisibleForTesting; -import java.util.Collection; -import java.util.Set; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; - -/** - * Track which windows are <i>active</i>, and the <i>state address window(s)</i> under which their - * state is stored. Also help with the multi-step process of merging windows and their associated - * state. - * - * <p>When windows are merged we must also merge their state. For example, we may need to - * concatenate buffered elements, sum a count of elements, or find a new minimum timestamp. - * If we start with two windows {@code Wa} and {@code Wb} and later discover they should be - * merged into window {@code Wab} then, naively, we must copy and merge the states of {@code Wa} - * and {@code Wab} into {@code Wab}. - * - * <p>However, the common case for merging windows is for a new window to be merged into an - * existing window. Thus, if {@code Wa} is the existing window and {@code Wb} the new window it - * is more efficient to leave the state for {@code Wa} where it is, and simply redirect {@code - * Wab} to it. In this case we say {@code Wab} has a state address window of {@code Wa}. - * - * <p>Even if windows {@code Wa} and {@code Wb} already have state, it can still be more efficient - * to append the state of {@code Wb} onto {@code Wa} rather than copy the state from {@code Wa} - * and {@code Wb} into {@code Wab}. - * - * <p>We use the following terminology for windows: - * <ol> - * <li><b>ACTIVE</b>: A window that has state associated with it and has not itself been merged - * away. The window may have one (or more) state address windows under which its - * non-empty state is stored. A state value for an ACTIVE window must be derived by reading - * the state in (all of) its state address windows. Note that only pre 1.4 pipelines - * use multiple state address windows per active window. From 1.4 onwards we eagerly merge - * window state into a single state address window. - * <li><b>NEW</b>: The initial state for a window of an incoming element which is not - * already ACTIVE. We have not yet called {@link WindowFn#mergeWindows}, and so don't yet know - * whether the window will be be merged into another NEW or ACTIVE window, or will - * become an ACTIVE window in its own right. - * </ol> - * - * <p>If no windows will ever be merged we can use the trivial implementation {@link - * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this data structure is in - * {@link MergingActiveWindowSet}. - * - * @param <W> the type of window being managed - */ -public interface ActiveWindowSet<W extends BoundedWindow> { - /** - * Callback for {@link #merge}. - */ - public interface MergeCallback<W extends BoundedWindow> { - /** - * Called when windows are about to be merged, but before any {@link #onMerge} callback - * has been made. - * - * @param toBeMerged the windows about to be merged. - * @param mergeResult the result window, either a member of {@code toBeMerged} or new. - */ - void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception; - - /** - * Called when windows are about to be merged, after all {@link #prefetchOnMerge} calls - * have been made, but before the active window set has been updated to reflect the merge. - * - * @param toBeMerged the windows about to be merged. - * @param mergeResult the result window, either a member of {@code toBeMerged} or new. - */ - void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception; - } - - /** - * Remove any remaining NEW windows since they were not promoted to being ACTIVE - * by {@link #ensureWindowIsActive} and we don't need to record anything about them. - */ - void cleanupTemporaryWindows(); - - /** - * Save any state changes needed. - */ - void persist(); - - /** - * Return (a view of) the set of currently ACTIVE and NEW windows. - */ - Set<W> getActiveAndNewWindows(); - - /** - * Return {@code true} if {@code window} is ACTIVE. - */ - boolean isActive(W window); - - /** - * Return {@code true} if {@code window} is ACTIVE or NEW. - */ - boolean isActiveOrNew(W window); - - /** - * Called when an incoming element indicates it is a member of {@code window}, but before we - * have started processing that element. If {@code window} is not already known to be ACTIVE, - * then add it as NEW. - */ - void ensureWindowExists(W window); - - /** - * Called when a NEW or ACTIVE window is now known to be ACTIVE. - * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only state address - * window). - */ - void ensureWindowIsActive(W window); - - /** - * If {@code window} is not already known to be ACTIVE then add it as ACTIVE. - * For testing only. - */ - @VisibleForTesting - void addActiveForTesting(W window); - - /** - * Remove {@code window} from the set. - */ - void remove(W window); - - /** - * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated with this window set, - * merging as many of the NEW and ACTIVE windows as possible. {@code mergeCallback} will be - * invoked for each group of windows that are merged. After this all merge result windows will - * be ACTIVE, and all windows which have been merged away will be neither ACTIVE nor NEW. - */ - void merge(MergeCallback<W> mergeCallback) throws Exception; - - /** - * Signal that all state in {@link #readStateAddresses} for {@code window} has been merged into - * the {@link #writeStateAddress} for {@code window}. - */ - void merged(W window); - - /** - * Return the state address windows for ACTIVE {@code window} from which all state associated - * should be read and merged. - */ - Set<W> readStateAddresses(W window); - - /** - * Return the state address window of ACTIVE {@code window} into which all new state should be - * written. Always one of the results of {@link #readStateAddresses}. - */ - W writeStateAddress(W window); - - /** - * Return the state address window into which all new state should be written after - * ACTIVE windows {@code toBeMerged} have been merged into {@code mergeResult}. - */ - W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java deleted file mode 100644 index 066579b..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.SetCoder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.ValueState; - -/** - * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations. - */ -public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> { - private final WindowFn<Object, W> windowFn; - - /** - * Map ACTIVE and NEW windows to their state address windows. Persisted. - * <ul> - * <li>A NEW window has the empty set as its value. - * <li>An ACTIVE window has its (typically singleton) set of state address windows as - * its value. - * </ul> - */ - private final Map<W, Set<W>> activeWindowToStateAddressWindows; - - /** - * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit. - * Used to avoid writing to state if no changes have been made during the work unit. - */ - private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows; - - /** - * Handle representing our state in the backend. - */ - private final ValueState<Map<W, Set<W>>> valueState; - - public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> state) { - this.windowFn = windowFn; - - StateTag<Object, ValueState<Map<W, Set<W>>>> tag = - StateTags.makeSystemTagInternal(StateTags.value( - "tree", MapCoder.of(windowFn.windowCoder(), SetCoder.of(windowFn.windowCoder())))); - valueState = state.state(StateNamespaces.global(), tag); - // Little use trying to prefetch this state since the ReduceFnRunner - // is stymied until it is available. - activeWindowToStateAddressWindows = emptyIfNull(valueState.read()); - originalActiveWindowToStateAddressWindows = deepCopy(activeWindowToStateAddressWindows); - } - - @Override - public void cleanupTemporaryWindows() { - // All NEW windows can be forgotten since they must have ended up being merged into - // some other ACTIVE window. - Iterator<Map.Entry<W, Set<W>>> iter = activeWindowToStateAddressWindows.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<W, Set<W>> entry = iter.next(); - if (entry.getValue().isEmpty()) { - iter.remove(); - } - } - } - - @Override - public void persist() { - checkInvariants(); - if (activeWindowToStateAddressWindows.isEmpty()) { - // Force all persistent state to disappear. - valueState.clear(); - return; - } - if (activeWindowToStateAddressWindows.equals(originalActiveWindowToStateAddressWindows)) { - // No change. - return; - } - valueState.write(activeWindowToStateAddressWindows); - // No need to update originalActiveWindowToStateAddressWindows since this object is about to - // become garbage. - } - - @Override - public Set<W> getActiveAndNewWindows() { - return activeWindowToStateAddressWindows.keySet(); - } - - @Override - public boolean isActive(W window) { - Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); - return stateAddressWindows != null && !stateAddressWindows.isEmpty(); - } - - @Override - public boolean isActiveOrNew(W window) { - return activeWindowToStateAddressWindows.containsKey(window); - } - - @Override - public void ensureWindowExists(W window) { - if (!activeWindowToStateAddressWindows.containsKey(window)) { - // Add window as NEW. - activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>()); - } - } - - @Override - public void ensureWindowIsActive(W window) { - Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); - checkState(stateAddressWindows != null, - "Cannot ensure window %s is active since it is neither ACTIVE nor NEW", - window); - if (stateAddressWindows != null && stateAddressWindows.isEmpty()) { - // Window was NEW, make it ACTIVE with itself as its state address window. - stateAddressWindows.add(window); - } - } - - @Override - @VisibleForTesting - public void addActiveForTesting(W window) { - if (!activeWindowToStateAddressWindows.containsKey(window)) { - // Make window ACTIVE with itself as its state address window. - Set<W> stateAddressWindows = new LinkedHashSet<>(); - stateAddressWindows.add(window); - activeWindowToStateAddressWindows.put(window, stateAddressWindows); - } - } - - @VisibleForTesting - public void addActiveForTesting(W window, Iterable<W> stateAddressWindows) { - if (!activeWindowToStateAddressWindows.containsKey(window)) { - activeWindowToStateAddressWindows.put(window, Sets.newLinkedHashSet(stateAddressWindows)); - } - } - - @Override - public void remove(W window) { - activeWindowToStateAddressWindows.remove(window); - } - - private class MergeContextImpl extends WindowFn<Object, W>.MergeContext { - private MergeCallback<W> mergeCallback; - private final List<Collection<W>> allToBeMerged; - private final List<W> allMergeResults; - private final Set<W> seen; - - public MergeContextImpl(MergeCallback<W> mergeCallback) { - windowFn.super(); - this.mergeCallback = mergeCallback; - allToBeMerged = new ArrayList<>(); - allMergeResults = new ArrayList<>(); - seen = new HashSet<>(); - } - - @Override - public Collection<W> windows() { - return activeWindowToStateAddressWindows.keySet(); - } - - @Override - public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception { - // The arguments have come from userland. - checkNotNull(toBeMerged); - checkNotNull(mergeResult); - List<W> copyOfToBeMerged = new ArrayList<>(toBeMerged.size()); - boolean includesMergeResult = false; - for (W window : toBeMerged) { - checkNotNull(window); - checkState(isActiveOrNew(window), "Expecting merge window %s to be ACTIVE or NEW", window); - if (window.equals(mergeResult)) { - includesMergeResult = true; - } - boolean notDup = seen.add(window); - checkState(notDup, "Expecting merge window %s to appear in at most one merge set", window); - copyOfToBeMerged.add(window); - } - if (!includesMergeResult) { - checkState(!isActive(mergeResult), "Expecting result window %s to be NEW", mergeResult); - } - allToBeMerged.add(copyOfToBeMerged); - allMergeResults.add(mergeResult); - } - - public void recordMerges() throws Exception { - for (int i = 0; i < allToBeMerged.size(); i++) { - mergeCallback.prefetchOnMerge(allToBeMerged.get(i), allMergeResults.get(i)); - } - for (int i = 0; i < allToBeMerged.size(); i++) { - mergeCallback.onMerge(allToBeMerged.get(i), allMergeResults.get(i)); - recordMerge(allToBeMerged.get(i), allMergeResults.get(i)); - } - allToBeMerged.clear(); - allMergeResults.clear(); - seen.clear(); - } - } - - @Override - public void merge(MergeCallback<W> mergeCallback) throws Exception { - MergeContextImpl context = new MergeContextImpl(mergeCallback); - - // See what the window function does with the NEW and already ACTIVE windows. - // Entering userland. - windowFn.mergeWindows(context); - - // Actually do the merging and invoke the callbacks. - context.recordMerges(); - } - - /** - * A {@link WindowFn#mergeWindows} call has determined that {@code toBeMerged} (which must - * all be ACTIVE}) should be considered equivalent to {@code activeWindow} (which is either a - * member of {@code toBeMerged} or is a new window). Make the corresponding change in - * the active window set. - */ - private void recordMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { - // Note that mergedWriteStateAddress must predict the result of writeStateAddress - // after the corresponding merge has been applied. - // Thus we must ensure the first state address window in the merged result here is - // deterministic. - // Thus we use a linked hash set. - Set<W> newStateAddressWindows = new LinkedHashSet<>(); - Set<W> existingStateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult); - if (existingStateAddressWindows != null) { - // Preserve all the existing state address windows for mergeResult. - newStateAddressWindows.addAll(existingStateAddressWindows); - } - - for (W other : toBeMerged) { - Set<W> otherStateAddressWindows = activeWindowToStateAddressWindows.get(other); - checkState(otherStateAddressWindows != null, - "Window %s is not ACTIVE or NEW", other); - - if (otherStateAddressWindows != null) { - for (W otherStateAddressWindow : otherStateAddressWindows) { - // Since otherTarget equiv other AND other equiv mergeResult - // THEN otherTarget equiv mergeResult. - newStateAddressWindows.add(otherStateAddressWindow); - } - } - activeWindowToStateAddressWindows.remove(other); - - // Now other equiv mergeResult. - } - - if (newStateAddressWindows.isEmpty()) { - // If stateAddressWindows is empty then toBeMerged must have only contained EPHEMERAL windows. - // Promote mergeResult to be ACTIVE now. - newStateAddressWindows.add(mergeResult); - } - - activeWindowToStateAddressWindows.put(mergeResult, newStateAddressWindows); - - merged(mergeResult); - } - - @Override - public void merged(W window) { - // Take just the first state address window. - Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); - checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window); - W first = Iterables.getFirst(stateAddressWindows, null); - stateAddressWindows.clear(); - stateAddressWindows.add(first); - } - - /** - * Return the state address windows for ACTIVE {@code window} from which all state associated - * should be read and merged. - */ - @Override - public Set<W> readStateAddresses(W window) { - Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); - checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window); - return stateAddressWindows; - } - - /** - * Return the state address window of ACTIVE {@code window} into which all new state should be - * written. - */ - @Override - public W writeStateAddress(W window) { - Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window); - checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window); - W result = Iterables.getFirst(stateAddressWindows, null); - checkState(result != null, "Window %s is still NEW", window); - return result; - } - - @Override - public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) { - Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(mergeResult); - if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) { - return Iterables.getFirst(stateAddressWindows, null); - } - for (W mergedWindow : toBeMerged) { - stateAddressWindows = activeWindowToStateAddressWindows.get(mergedWindow); - if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) { - return Iterables.getFirst(stateAddressWindows, null); - } - } - return mergeResult; - } - - @VisibleForTesting - public void checkInvariants() { - Set<W> knownStateAddressWindows = new HashSet<>(); - for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) { - W active = entry.getKey(); - checkState(!entry.getValue().isEmpty(), - "Unexpected empty state address window set for ACTIVE window %s", - active); - for (W stateAddressWindow : entry.getValue()) { - checkState(knownStateAddressWindows.add(stateAddressWindow), - "%s is in more than one state address window set", - stateAddressWindow); - } - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("MergingActiveWindowSet {\n"); - for (Map.Entry<W, Set<W>> entry : activeWindowToStateAddressWindows.entrySet()) { - W active = entry.getKey(); - Set<W> stateAddressWindows = entry.getValue(); - if (stateAddressWindows.isEmpty()) { - sb.append(" NEW "); - sb.append(active); - sb.append('\n'); - } else { - sb.append(" ACTIVE "); - sb.append(active); - sb.append(":\n"); - for (W stateAddressWindow : stateAddressWindows) { - sb.append(" "); - sb.append(stateAddressWindow); - sb.append("\n"); - } - } - } - sb.append("}"); - return sb.toString(); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof MergingActiveWindowSet)) { - return false; - } - - @SuppressWarnings("unchecked") - MergingActiveWindowSet<W> other = (MergingActiveWindowSet<W>) o; - - return activeWindowToStateAddressWindows.equals(other.activeWindowToStateAddressWindows); - } - - @Override - public int hashCode() { - return Objects.hashCode(activeWindowToStateAddressWindows); - } - - /** - * Replace null {@code multimap} with empty map, and replace null entries in {@code multimap} - * with - * empty sets. - */ - private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> multimap) { - if (multimap == null) { - return new HashMap<>(); - } else { - for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) { - if (entry.getValue() == null) { - entry.setValue(new LinkedHashSet<W>()); - } - } - return multimap; - } - } - - /** - * Return a deep copy of {@code multimap}. - */ - private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) { - Map<W, Set<W>> newMultimap = new HashMap<>(); - for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) { - newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue())); - } - return newMultimap; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java deleted file mode 100644 index 99d591b..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; -import java.util.Collection; -import java.util.Set; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; - -/** - * Implementation of {@link ActiveWindowSet} used with {@link WindowFn WindowFns} that don't - * support - * merging. - * - * @param <W> the types of windows being managed - */ -public class NonMergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> { - @Override - public void cleanupTemporaryWindows() {} - - @Override - public void persist() {} - - @Override - public Set<W> getActiveAndNewWindows() { - // Only supported when merging. - throw new java.lang.UnsupportedOperationException(); - } - - @Override - public boolean isActive(W window) { - // Windows should never disappear, since we don't support merging. - return true; - } - - @Override - public boolean isActiveOrNew(W window) { - return true; - } - - @Override - public void ensureWindowExists(W window) {} - - @Override - public void ensureWindowIsActive(W window) {} - - @Override - @VisibleForTesting - public void addActiveForTesting(W window) {} - - @Override - public void remove(W window) {} - - @Override - public void merge(MergeCallback<W> mergeCallback) throws Exception {} - - @Override - public void merged(W window) {} - - @Override - public Set<W> readStateAddresses(W window) { - return ImmutableSet.of(window); - } - - @Override - public W writeStateAddress(W window) { - return window; - } - - @Override - public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) { - return mergeResult; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java deleted file mode 100644 index 676a25a..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.StateInternals; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test NonMergingActiveWindowSet. - */ -@RunWith(JUnit4.class) -public class MergingActiveWindowSetTest { - private Sessions windowFn; - private StateInternals<String> state; - private MergingActiveWindowSet<IntervalWindow> set; - private ActiveWindowSet.MergeCallback<IntervalWindow> callback; - - @Before - public void setup() { - windowFn = Sessions.withGapDuration(Duration.millis(10)); - state = InMemoryStateInternals.forKey("dummyKey"); - set = new MergingActiveWindowSet<>(windowFn, state); - @SuppressWarnings("unchecked") - ActiveWindowSet.MergeCallback<IntervalWindow> - callback = mock(ActiveWindowSet.MergeCallback.class); - this.callback = callback; - } - - @After - public void after() { - set = null; - state = null; - windowFn = null; - } - - private void add(long... instants) { - for (final long instant : instants) { - System.out.println("ADD " + instant); - Sessions.AssignContext context = windowFn.new AssignContext() { - @Override - public Object element() { - return (Object) instant; - } - - @Override - public Instant timestamp() { - return new Instant(instant); - } - - @Override - public BoundedWindow window() { - return GlobalWindow.INSTANCE; - } - }; - - for (IntervalWindow window : windowFn.assignWindows(context)) { - set.ensureWindowExists(window); - } - } - } - - private Map<IntervalWindow, IntervalWindow> merge( - List<IntervalWindow> toBeMerged, - IntervalWindow mergeResult) throws Exception { - IntervalWindow predictedPostMergeWriteStateAddress = - set.mergedWriteStateAddress(toBeMerged, mergeResult); - - System.out.println("BEFORE MERGE"); - System.out.println(set); - Map<IntervalWindow, IntervalWindow> map = new HashMap<>(); - for (IntervalWindow window : toBeMerged) { - System.out.println("WILL MERGE " + window + " INTO " + mergeResult); - map.put(window, mergeResult); - } - System.out.println("AFTER MERGE"); - set.merge(callback); - verify(callback).onMerge(toBeMerged, mergeResult); - System.out.println(set); - - assertEquals(predictedPostMergeWriteStateAddress, set.writeStateAddress(mergeResult)); - - return map; - } - - private void activate(Map<IntervalWindow, IntervalWindow> map, long... instants) { - for (long instant : instants) { - IntervalWindow window = window(instant, 10); - IntervalWindow active = map.get(window); - if (active == null) { - active = window; - } - System.out.println("ACTIVATE " + active); - set.ensureWindowIsActive(active); - } - set.checkInvariants(); - } - - private void cleanup() { - System.out.println("CLEANUP"); - set.cleanupTemporaryWindows(); - set.checkInvariants(); - System.out.println(set); - set.persist(); - MergingActiveWindowSet<IntervalWindow> reloaded = - new MergingActiveWindowSet<>(windowFn, state); - reloaded.checkInvariants(); - assertEquals(set, reloaded); - } - - private IntervalWindow window(long start, long size) { - return new IntervalWindow(new Instant(start), new Duration(size)); - } - - @Test - public void testLifecycle() throws Exception { - // Step 1: New elements show up, introducing NEW windows which are partially merged. - // NEW 1+10 - // NEW 2+10 - // NEW 15+10 - // => - // ACTIVE 1+11 (target 1+11) - // ACTIVE 15+10 (target 15+10) - add(1, 2, 15); - assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(15, 10)), - set.getActiveAndNewWindows()); - Map<IntervalWindow, IntervalWindow> map = - merge(ImmutableList.of(window(1, 10), window(2, 10)), - window(1, 11)); - activate(map, 1, 2, 15); - assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), set.getActiveAndNewWindows()); - assertEquals( - ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 11))); - assertEquals( - ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 10))); - cleanup(); - - // Step 2: Another element, merged into an existing ACTIVE window. - // NEW 3+10 - // => - // ACTIVE 1+12 (target 1+11) - // ACTIVE 15+10 (target 15+10) - add(3); - assertEquals(ImmutableSet.of(window(3, 10), window(1, 11), window(15, 10)), - set.getActiveAndNewWindows()); - map = merge(ImmutableList.of(window(1, 11), window(3, 10)), - window(1, 12)); - activate(map, 3); - assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), set.getActiveAndNewWindows()); - assertEquals( - ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 12))); - assertEquals( - ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 10))); - cleanup(); - - // Step 3: Another element, causing two ACTIVE windows to be merged. - // NEW 8+10 - // => - // ACTIVE 1+24 (target 1+11) - add(8); - assertEquals(ImmutableSet.of(window(8, 10), window(1, 12), window(15, 10)), - set.getActiveAndNewWindows()); - map = merge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)), - window(1, 24)); - activate(map, 8); - assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); - assertEquals( - ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); - cleanup(); - - // Step 4: Another element, merged into an existing ACTIVE window. - // NEW 9+10 - // => - // ACTIVE 1+24 (target 1+11) - add(9); - assertEquals(ImmutableSet.of(window(9, 10), window(1, 24)), set.getActiveAndNewWindows()); - map = merge(ImmutableList.of(window(1, 24), window(9, 10)), - window(1, 24)); - activate(map, 9); - assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); - assertEquals( - ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); - cleanup(); - - // Step 5: Another element reusing earlier window, merged into an existing ACTIVE window. - // NEW 1+10 - // => - // ACTIVE 1+24 (target 1+11) - add(1); - assertEquals(ImmutableSet.of(window(1, 10), window(1, 24)), set.getActiveAndNewWindows()); - map = merge(ImmutableList.of(window(1, 10), window(1, 24)), - window(1, 24)); - activate(map, 1); - assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows()); - assertEquals( - ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24))); - cleanup(); - - // Step 6: Window is closed. - set.remove(window(1, 24)); - cleanup(); - assertTrue(set.getActiveAndNewWindows().isEmpty()); - } - - @Test - public void testLegacyState() { - // Pre 1.4 we merged window state lazily. - // Simulate loading an active window set with multiple state address windows. - set.addActiveForTesting(window(1, 12), - ImmutableList.of(window(1, 10), window(2, 10), window(3, 10))); - - - // Make sure we can detect and repair the state. - assertTrue(set.isActive(window(1, 12))); - assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(3, 10)), - set.readStateAddresses(window(1, 12))); - assertEquals(window(1, 10), - set.mergedWriteStateAddress( - ImmutableList.of(window(1, 10), window(2, 10), window(3, 10)), - window(1, 12))); - set.merged(window(1, 12)); - cleanup(); - - // For then on we are back to the eager case. - assertEquals(ImmutableSet.of(window(1, 10)), set.readStateAddresses(window(1, 12))); - } -}