Repository: incubator-beam Updated Branches: refs/heads/master 4b682039d -> c8f2cdb22
Revert "Improvements to ReduceFnRunner prefetching" This reverts commit 4282c67c5fa4dea2fe6c8695e0ea23f383c6457b, which contained some incompatibilities outside of runners-core. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aaa3b91e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aaa3b91e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aaa3b91e Branch: refs/heads/master Commit: aaa3b91e1e7b39dd585314a6017235cdd127e923 Parents: 4b68203 Author: Kenneth Knowles <k...@google.com> Authored: Wed Nov 30 15:21:53 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Nov 30 15:21:53 2016 -0800 ---------------------------------------------------------------------- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 11 +- .../beam/runners/core/PaneInfoTracker.java | 4 - .../runners/core/ReduceFnContextFactory.java | 9 +- .../beam/runners/core/ReduceFnRunner.java | 488 +++++++------------ .../apache/beam/runners/core/WatermarkHold.java | 5 - .../triggers/TriggerStateMachineRunner.java | 14 +- .../beam/runners/core/ReduceFnTester.java | 4 +- .../GroupAlsoByWindowEvaluatorFactory.java | 5 +- .../apache/beam/sdk/transforms/DoFnTester.java | 6 +- .../sdk/util/state/InMemoryTimerInternals.java | 22 +- .../beam/sdk/util/state/TimerCallback.java | 9 +- .../util/state/InMemoryTimerInternalsTest.java | 54 +- 12 files changed, 224 insertions(+), 407 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 294f21d..8b10813 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; @@ -72,9 +73,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn< @Override public void processElement(ProcessContext c) throws Exception { - KeyedWorkItem<K, InputT> keyedWorkItem = c.element(); + KeyedWorkItem<K, InputT> element = c.element(); - K key = keyedWorkItem.key(); + K key = c.element().key(); TimerInternals timerInternals = c.windowingInternals().timerInternals(); StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); @@ -92,8 +93,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn< reduceFn, c.getPipelineOptions()); - reduceFnRunner.processElements(keyedWorkItem.elementsIterable()); - reduceFnRunner.onTimers(keyedWorkItem.timersIterable()); + reduceFnRunner.processElements(element.elementsIterable()); + for (TimerData timer : element.timersIterable()) { + reduceFnRunner.onTimer(timer); + } reduceFnRunner.persist(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java index 69a4cfd..8140243 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java @@ -54,10 +54,6 @@ public class PaneInfoTracker { state.access(PANE_INFO_TAG).clear(); } - public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) { - context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater(); - } - /** * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane * info includes the timing for the pane, who's calculation is quite subtle. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index c5bda9b..539126a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateAccessor; import org.apache.beam.sdk.util.state.StateContext; @@ -116,7 +117,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { } public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window, - PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) { + ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) { return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks); } @@ -388,11 +389,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext { private final StateAccessorImpl<K, W> state; - private final PaneInfo pane; + private final ReadableState<PaneInfo> pane; private final OnTriggerCallbacks<OutputT> callbacks; private final TimersImpl timers; - private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane, + private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane, OnTriggerCallbacks<OutputT> callbacks) { reduceFn.super(); this.state = state; @@ -423,7 +424,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { @Override public PaneInfo paneInfo() { - return pane; + return pane.read(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 3a82be9..a686f46 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -21,15 +21,12 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -61,6 +58,7 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; import org.apache.beam.sdk.util.state.TimerCallback; @@ -270,32 +268,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme return activeWindows.getActiveAndNewWindows().isEmpty(); } - private Set<W> openWindows(Collection<W> windows) { - Set<W> result = new HashSet<>(); - for (W window : windows) { - ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base( - window, StateStyle.DIRECT); - if (!triggerRunner.isClosed(directContext.state())) { - result.add(window); - } - } - return result; - } - - private Collection<W> windowsThatShouldFire(Set<W> windows) throws Exception { - Collection<W> result = new ArrayList<>(); - // Filter out timers that didn't trigger. - for (W window : windows) { - ReduceFn<K, InputT, OutputT, W>.Context directContext = - contextFactory.base(window, StateStyle.DIRECT); - if (triggerRunner.shouldFire( - directContext.window(), directContext.timers(), directContext.state())) { - result.add(window); - } - } - return result; - } - /** * Incorporate {@code values} into the underlying reduce function, and manage holds, timers, * triggers, and window merging. @@ -321,54 +293,25 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme * </ol> */ public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception { - if (!values.iterator().hasNext()) { - return; - } - - // Determine all the windows for elements. - Set<W> windows = collectWindows(values); // If an incoming element introduces a new window, attempt to merge it into an existing // window eagerly. - Map<W, W> windowToMergeResult = mergeWindows(windows); - if (!windowToMergeResult.isEmpty()) { - // Update windows by removing all windows that were merged away and adding - // the windows they were merged to. We add after completing all the - // removals to avoid removing a window that was also added. - List<W> addedWindows = new ArrayList<>(windowToMergeResult.size()); - for (Map.Entry<W, W> entry : windowToMergeResult.entrySet()) { - windows.remove(entry.getKey()); - addedWindows.add(entry.getValue()); - } - windows.addAll(addedWindows); - } + Map<W, W> windowToMergeResult = collectAndMergeWindows(values); - prefetchWindowsForValues(windows); - - // All windows that are open before element processing may need to fire. - Set<W> windowsToConsider = openWindows(windows); + Set<W> windowsToConsider = new HashSet<>(); // Process each element, using the updated activeWindows determined by collectAndMergeWindows. for (WindowedValue<InputT> value : values) { - processElement(windowToMergeResult, value); + windowsToConsider.addAll(processElement(windowToMergeResult, value)); } - // Now that we've processed the elements, see if any of the windows need to fire. - // Prefetch state necessary to determine if the triggers should fire. + // Trigger output from any window for which the trigger is ready for (W mergedWindow : windowsToConsider) { - triggerRunner.prefetchShouldFire( - mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state()); - } - // Filter to windows that are firing. - Collection<W> windowsToFire = windowsThatShouldFire(windowsToConsider); - // Prefetch windows that are firing. - for (W window : windowsToFire) { - prefetchEmit(contextFactory.base(window, StateStyle.DIRECT), - contextFactory.base(window, StateStyle.RENAMED)); - } - // Trigger output from firing windows. - for (W window : windowsToFire) { - emit(contextFactory.base(window, StateStyle.DIRECT), - contextFactory.base(window, StateStyle.RENAMED)); + ReduceFn<K, InputT, OutputT, W>.Context directContext = + contextFactory.base(mergedWindow, StateStyle.DIRECT); + ReduceFn<K, InputT, OutputT, W>.Context renamedContext = + contextFactory.base(mergedWindow, StateStyle.RENAMED); + triggerRunner.prefetchShouldFire(mergedWindow, directContext.state()); + emitIfAppropriate(directContext, renamedContext); } // We're all done with merging and emitting elements so can compress the activeWindow state. @@ -382,61 +325,52 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme } /** - * Extract the windows associated with the values. + * Extract the windows associated with the values, and invoke merge. Return a map + * from windows to the merge result window. If a window is not in the domain of + * the result map then it did not get merged into a different window. */ - private Set<W> collectWindows(Iterable<WindowedValue<InputT>> values) throws Exception { - Set<W> windows = new HashSet<>(); - for (WindowedValue<?> value : values) { - for (BoundedWindow untypedWindow : value.getWindows()) { - @SuppressWarnings("unchecked") - W window = (W) untypedWindow; - windows.add(window); - } - } - return windows; - } - - /** - * Invoke merge for the given windows and return a map from windows to the - * merge result window. Windows that were not merged are not present in the - * map. - */ - private Map<W, W> mergeWindows(Set<W> windows) throws Exception { + private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values) + throws Exception { + // No-op if no merging can take place if (windowingStrategy.getWindowFn().isNonMerging()) { - // Return an empty map, indicating that every window is not merged. - return Collections.emptyMap(); + return ImmutableMap.of(); } - Map<W, W> windowToMergeResult = new HashMap<>(); // Collect the windows from all elements (except those which are too late) and // make sure they are already in the active window set or are added as NEW windows. - for (W window : windows) { - // For backwards compat with pre 1.4 only. - // We may still have ACTIVE windows with multiple state addresses, representing - // a window who's state has not yet been eagerly merged. - // We'll go ahead and merge that state now so that we don't have to worry about - // this legacy case anywhere else. - if (activeWindows.isActive(window)) { - Set<W> stateAddressWindows = activeWindows.readStateAddresses(window); - if (stateAddressWindows.size() > 1) { - // This is a legacy window who's state has not been eagerly merged. - // Do that now. - ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext = - contextFactory.forPremerge(window); - reduceFn.onMerge(premergeContext); - watermarkHold.onMerge(premergeContext); - activeWindows.merged(window); + for (WindowedValue<?> value : values) { + for (BoundedWindow untypedWindow : value.getWindows()) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + + // For backwards compat with pre 1.4 only. + // We may still have ACTIVE windows with multiple state addresses, representing + // a window who's state has not yet been eagerly merged. + // We'll go ahead and merge that state now so that we don't have to worry about + // this legacy case anywhere else. + if (activeWindows.isActive(window)) { + Set<W> stateAddressWindows = activeWindows.readStateAddresses(window); + if (stateAddressWindows.size() > 1) { + // This is a legacy window who's state has not been eagerly merged. + // Do that now. + ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext = + contextFactory.forPremerge(window); + reduceFn.onMerge(premergeContext); + watermarkHold.onMerge(premergeContext); + activeWindows.merged(window); + } } - } - // Add this window as NEW if it is not currently ACTIVE. - // If we had already seen this window and closed its trigger, then the - // window will not be currently ACTIVE. It will then be added as NEW here, - // and fall into the merging logic as usual. - activeWindows.ensureWindowExists(window); + // Add this window as NEW if it is not currently ACTIVE. + // If we had already seen this window and closed its trigger, then the + // window will not be currently ACTIVE. It will then be added as NEW here, + // and fall into the merging logic as usual. + activeWindows.ensureWindowExists(window); + } } // Merge all of the active windows and retain a mapping from source windows to result windows. + Map<W, W> windowToMergeResult = new HashMap<>(); activeWindows.merge(new OnMergeCallback(windowToMergeResult)); return windowToMergeResult; } @@ -538,50 +472,38 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme } /** - * Redirect element windows to the ACTIVE windows they have been merged into. - * The compressed representation (value, {window1, window2, ...}) actually represents - * distinct elements (value, window1), (value, window2), ... - * so if window1 and window2 merge, the resulting window will contain both copies - * of the value. + * Process an element. + * + * @param value the value being processed + * @return the set of windows in which the element was actually processed */ - private ImmutableSet<W> toMergedWindows(final Map<W, W> windowToMergeResult, - final Collection<? extends BoundedWindow> windows) { - return ImmutableSet.copyOf( - FluentIterable.from(windows).transform( - new Function<BoundedWindow, W>() { - @Override - public W apply(BoundedWindow untypedWindow) { - @SuppressWarnings("unchecked") - W window = (W) untypedWindow; - W mergedWindow = windowToMergeResult.get(window); - // If the element is not present in the map, the window is unmerged. - return (mergedWindow == null) ? window : mergedWindow; - } - } - )); - } + private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value) + throws Exception { + // Redirect element windows to the ACTIVE windows they have been merged into. + // The compressed representation (value, {window1, window2, ...}) actually represents + // distinct elements (value, window1), (value, window2), ... + // so if window1 and window2 merge, the resulting window will contain both copies + // of the value. + Collection<W> windows = new ArrayList<>(); + for (BoundedWindow untypedWindow : value.getWindows()) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + W mergeResult = windowToMergeResult.get(window); + if (mergeResult == null) { + mergeResult = window; + } + windows.add(mergeResult); + } - private void prefetchWindowsForValues(Collection<W> windows) { // Prefetch in each of the windows if we're going to need to process triggers for (W window : windows) { - ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base( - window, StateStyle.DIRECT); + ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue( + window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); triggerRunner.prefetchForValue(window, directContext.state()); } - } - - /** - * Process an element. - * - * @param windowToMergeResult map of windows to merged windows. If a window is - * not present it is unmerged. - * @param value the value being processed - */ - private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value) - throws Exception { - ImmutableSet<W> windows = toMergedWindows(windowToMergeResult, value.getWindows()); // Process the element for each (mergeResultWindow, not closed) window it belongs to. + List<W> triggerableWindows = new ArrayList<>(windows.size()); for (W window : windows) { ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); @@ -596,6 +518,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme continue; } + triggerableWindows.add(window); activeWindows.ensureWindowIsActive(window); ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); @@ -639,152 +562,102 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme // cannot take a trigger state from firing to non-firing. // (We don't actually assert this since it is too slow.) } + + return triggerableWindows; } /** - * Enriches TimerData with state necessary for processing a timer as well as - * common queries about a timer. + * Called when an end-of-window, garbage collection, or trigger-specific timer fires. */ - private class EnrichedTimerData { - public final Instant timestamp; - public final ReduceFn<K, InputT, OutputT, W>.Context directContext; - public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext; - // If this is an end-of-window timer then we may need to set a garbage collection timer - // if allowed lateness is non-zero. - public final boolean isEndOfWindow; - // If this is a garbage collection timer then we should trigger and - // garbage collect the window. We'll consider any timer at or after the - // end-of-window time to be a signal to garbage collect. - public final boolean isGarbageCollection; - - EnrichedTimerData( - TimerData timer, - ReduceFn<K, InputT, OutputT, W>.Context directContext, - ReduceFn<K, InputT, OutputT, W>.Context renamedContext) { - this.timestamp = timer.getTimestamp(); - this.directContext = directContext; - this.renamedContext = renamedContext; - W window = directContext.window(); - this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); - Instant cleanupTime = garbageCollectionTime(window); - this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime); - } + public void onTimer(TimerData timer) throws Exception { + // Which window is the timer for? + checkArgument(timer.getNamespace() instanceof WindowNamespace, + "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); + @SuppressWarnings("unchecked") + WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace(); + W window = windowNamespace.getWindow(); + ReduceFn<K, InputT, OutputT, W>.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + ReduceFn<K, InputT, OutputT, W>.Context renamedContext = + contextFactory.base(window, StateStyle.RENAMED); // Has this window had its trigger finish? // - The trigger may implement isClosed as constant false. // - If the window function does not support windowing then all windows will be considered // active. // So we must take conjunction of activeWindows and triggerRunner state. - public boolean windowIsActiveAndOpen() { - return activeWindows.isActive(directContext.window()) - && !triggerRunner.isClosed(directContext.state()); - } - } + boolean windowIsActiveAndOpen = + activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state()); - public void onTimers(Iterable<TimerData> timers) throws Exception { - if (!timers.iterator().hasNext()) { - return; + if (!windowIsActiveAndOpen) { + WindowTracing.debug( + "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); } - // Create a reusable context for each timer and begin prefetching necessary - // state. - List<EnrichedTimerData> enrichedTimers = new LinkedList(); - for (TimerData timer : timers) { - checkArgument(timer.getNamespace() instanceof WindowNamespace, - "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); - @SuppressWarnings("unchecked") - WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace(); - W window = windowNamespace.getWindow(); - ReduceFn<K, InputT, OutputT, W>.Context directContext = - contextFactory.base(window, StateStyle.DIRECT); - ReduceFn<K, InputT, OutputT, W>.Context renamedContext = - contextFactory.base(window, StateStyle.RENAMED); - EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext); - enrichedTimers.add(enrichedTimer); - - // Perform prefetching of state to determine if the trigger should fire. - if (enrichedTimer.isGarbageCollection) { - triggerRunner.prefetchIsClosed(directContext.state()); - } else { - triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); + // If this is an end-of-window timer then we may need to set a garbage collection timer + // if allowed lateness is non-zero. + boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + + // If this is a garbage collection timer then we should trigger and garbage collect the window. + // We'll consider any timer at or after the end-of-window time to be a signal to garbage + // collect. + Instant cleanupTime = garbageCollectionTime(window); + boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain() + && !timer.getTimestamp().isBefore(cleanupTime); + + if (isGarbageCollection) { + WindowTracing.debug( + "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + + if (windowIsActiveAndOpen) { + // We need to call onTrigger to emit the final pane if required. + // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, + // and the watermark has passed the end of the window. + @Nullable Instant newHold = + onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); + checkState(newHold == null, + "Hold placed at %s despite isFinished being true.", newHold); } - } - // For those windows that are active and open, prefetch the triggering or emitting state. - for (EnrichedTimerData timer : enrichedTimers) { - if (timer.windowIsActiveAndOpen()) { - ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext; - if (timer.isGarbageCollection) { - prefetchOnTrigger(directContext, timer.renamedContext); - } else if (triggerRunner.shouldFire( - directContext.window(), directContext.timers(), directContext.state())) { - prefetchEmit(directContext, timer.renamedContext); - } + // Cleanup flavor B: Clear all the remaining state for this window since we'll never + // see elements for it again. + clearAllState(directContext, renamedContext, windowIsActiveAndOpen); + } else { + WindowTracing.debug( + "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + if (windowIsActiveAndOpen) { + emitIfAppropriate(directContext, renamedContext); } - } - - // Perform processing now that everything is prefetched. - for (EnrichedTimerData timer : enrichedTimers) { - ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext; - ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext; - if (timer.isGarbageCollection) { - WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), timer.timestamp, - timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - - boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen(); - if (windowIsActiveAndOpen) { - // We need to call onTrigger to emit the final pane if required. - // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, - // and the watermark has passed the end of the window. - @Nullable - Instant newHold = onTrigger( - directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow); - checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold); - } - - // Cleanup flavor B: Clear all the remaining state for this window since we'll never - // see elements for it again. - clearAllState(directContext, renamedContext, windowIsActiveAndOpen); - } else { - WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), timer.timestamp, - timerInternals.currentInputWatermarkTime(), + if (isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. + // We could assert this but it is very expensive. + + // Since we are processing an on-time firing we should schedule the garbage collection + // timer. (If getAllowedLateness is zero then the timer event will be considered a + // cleanup event and handled by the above). + // Note we must do this even if the trigger is finished so that we are sure to cleanup + // any final trigger finished bits. + checkState( + windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), + "Unexpected zero getAllowedLateness"); + WindowTracing.debug( + "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - if (timer.windowIsActiveAndOpen() - && triggerRunner.shouldFire( - directContext.window(), directContext.timers(), directContext.state())) { - emit(directContext, renamedContext); - } - - if (timer.isEndOfWindow) { - // If the window strategy trigger includes a watermark trigger then at this point - // there should be no data holds, either because we'd already cleared them on an - // earlier onTrigger, or because we just cleared them on the above emit. - // We could assert this but it is very expensive. - - // Since we are processing an on-time firing we should schedule the garbage collection - // timer. (If getAllowedLateness is zero then the timer event will be considered a - // cleanup event and handled by the above). - // Note we must do this even if the trigger is finished so that we are sure to cleanup - // any final trigger finished bits. - checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), - "Unexpected zero getAllowedLateness"); - Instant cleanupTime = garbageCollectionTime(directContext.window()); - WindowTracing.debug( - "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Cleanup time %s is beyond end-of-time", cleanupTime); - directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME); - } + checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Cleanup time %s is beyond end-of-time", cleanupTime); + directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME); } } } @@ -793,7 +666,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme * Clear all the state associated with {@code context}'s window. * Should only be invoked if we know all future elements for this window will be considered * beyond allowed lateness. - * This is a superset of the clearing done by {@link #emitPane} below since: + * This is a superset of the clearing done by {@link #emitIfAppropriate} below since: * <ol> * <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is * closed again. @@ -819,10 +692,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme } else { // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2). // For (1), if !activeWindows.isActive then the window must be merging and has been - // explicitly removed by emit. But in that case the trigger must have fired + // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired // and been closed, so this case reduces to (2). // For (2), if triggerRunner.isClosed then the trigger was fired and entered the - // closed state. In that case emit will have cleared all state in + // closed state. In that case emitIfAppropriate will have cleared all state in // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows. // We also know nonEmptyPanes must have been unconditionally cleared by the trigger. // Since the trigger fired the existing watermark holds must have been cleared, and since @@ -864,23 +737,17 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme return false; } - private void prefetchEmit(ReduceFn<K, InputT, OutputT, W>.Context directContext, - ReduceFn<K, InputT, OutputT, W>.Context renamedContext) { - triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); - triggerRunner.prefetchOnFire(directContext.window(), directContext.state()); - triggerRunner.prefetchIsClosed(directContext.state()); - prefetchOnTrigger(directContext, renamedContext); - } - /** - * Emit if a trigger is ready to fire or timers require it, and cleanup state. + * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state. */ - private void emit( - ReduceFn<K, InputT, OutputT, W>.Context directContext, + private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext, ReduceFn<K, InputT, OutputT, W>.Context renamedContext) throws Exception { - checkState(triggerRunner.shouldFire( - directContext.window(), directContext.timers(), directContext.state())); + if (!triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + // Ignore unless trigger is ready to fire + return; + } // Inform the trigger of the transition to see if it is finished triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state()); @@ -915,7 +782,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme } /** - * Do we need to emit? + * Do we need to emit a pane? */ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { if (!isEmpty) { @@ -933,15 +800,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme return false; } - private void prefetchOnTrigger( - final ReduceFn<K, InputT, OutputT, W>.Context directContext, - ReduceFn<K, InputT, OutputT, W>.Context renamedContext) { - paneInfoTracker.prefetchPaneInfo(directContext); - watermarkHold.prefetchExtract(renamedContext); - nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); - reduceFn.prefetchOnTrigger(directContext.state()); - } - /** * Run the {@link ReduceFn#onTrigger} method and produce any necessary output. * @@ -955,17 +813,25 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); - // Calculate the pane info. - final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read(); + // Prefetch necessary states + ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture = + watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); + ReadableState<PaneInfo> paneFuture = + paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); + ReadableState<Boolean> isEmptyFuture = + nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); + reduceFn.prefetchOnTrigger(directContext.state()); + triggerRunner.prefetchOnFire(directContext.window(), directContext.state()); + + // Calculate the pane info. + final PaneInfo pane = paneFuture.read(); // Extract the window hold, and as a side effect clear it. - final WatermarkHold.OldAndNewHolds pair = - watermarkHold.extractAndRelease(renamedContext, isFinished).read(); + + WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); final Instant outputTimestamp = pair.oldHold; @Nullable Instant newHold = pair.newHold; - final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read(); - if (newHold != null) { // We can't be finished yet. checkState( @@ -997,11 +863,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme } // Only emit a pane if it has data or empty panes are observable. - if (needToEmit(isEmpty, isFinished, pane.getTiming())) { + if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { // Run reduceFn.onTrigger method. final List<W> windows = Collections.singletonList(directContext.window()); ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext = - contextFactory.forTrigger(directContext.window(), pane, StateStyle.RENAMED, + contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED, new OnTriggerCallbacks<OutputT>() { @Override public void output(OutputT toOutput) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 7f1afcc..3c04571 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -444,11 +444,6 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { } } - public void prefetchExtract(final ReduceFn<?, ?, ?, W>.Context context) { - context.state().access(elementHoldTag).readLater(); - context.state().access(EXTRA_HOLD_TAG).readLater(); - } - /** * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index 2f277eb..9f03216 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -99,25 +99,25 @@ public class TriggerStateMachineRunner<W extends BoundedWindow> { return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); } - public void prefetchIsClosed(StateAccessor<?> state) { + public void prefetchForValue(W window, StateAccessor<?> state) { if (isFinishedSetNeeded()) { state.access(FINISHED_BITS_TAG).readLater(); } - } - - public void prefetchForValue(W window, StateAccessor<?> state) { - prefetchIsClosed(state); rootTrigger.getSpec().prefetchOnElement( contextFactory.createStateAccessor(window, rootTrigger)); } public void prefetchOnFire(W window, StateAccessor<?> state) { - prefetchIsClosed(state); + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); } public void prefetchShouldFire(W window, StateAccessor<?> state) { - prefetchIsClosed(state); + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } rootTrigger.getSpec().prefetchShouldFire( contextFactory.createStateAccessor(window, rootTrigger)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 8be8ae5..337be23 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -509,10 +509,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception { ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); - ArrayList timers = new ArrayList(1); - timers.add( + runner.onTimer( TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain)); - runner.onTimers(timers); runner.persist(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index f70fb94..9d25bc6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -200,7 +201,9 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { // Drop any elements within expired windows reduceFnRunner.processElements( dropExpiredWindows(key, workItem.elementsIterable(), timerInternals)); - reduceFnRunner.onTimers(workItem.timersIterable()); + for (TimerData timer : workItem.timersIterable()) { + reduceFnRunner.onTimer(timer); + } reduceFnRunner.persist(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index f8b1222..daa8a06 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -466,10 +466,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { private static TimerCallback collectInto(final List<TimerInternals.TimerData> firedTimers) { return new TimerCallback() { @Override - public void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception { - for (TimerInternals.TimerData timer : timers) { - firedTimers.add(timer); - } + public void onTimer(TimerInternals.TimerData timer) throws Exception { + firedTimers.add(timer); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java index f1ddaac..a3bb45a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.base.MoreObjects; -import java.util.ArrayList; import java.util.HashSet; import java.util.PriorityQueue; import java.util.Set; @@ -236,20 +235,13 @@ public class InMemoryTimerInternals implements TimerInternals { throws Exception { checkNotNull(timerCallback); PriorityQueue<TimerData> queue = queue(domain); - while (true) { - ArrayList<TimerData> firedTimers = new ArrayList(); - while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { - // Remove before firing, so that if the callback adds another identical - // timer we don't remove it. - TimerData timer = queue.remove(); - firedTimers.add(timer); - WindowTracing.trace( - "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); - } - if (firedTimers.isEmpty()) { - break; - } - timerCallback.onTimers(firedTimers); + while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { + // Remove before firing, so that if the callback adds another identical + // timer we don't remove it. + TimerData timer = queue.remove(); + WindowTracing.trace( + "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); + timerCallback.onTimer(timer); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java index dfdfd5b..6598e30 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java @@ -19,17 +19,16 @@ package org.apache.beam.sdk.util.state; import org.apache.beam.sdk.util.TimerInternals; - /** - * A callback that processes an Iterable of {@link TimerInternals.TimerData TimerData}. + * A callback that processes a {@link TimerInternals.TimerData TimerData}. */ public interface TimerCallback { - /** Processes an Iterable of {@link TimerInternals.TimerData TimerData}. */ - void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception; + /** Processes the {@link TimerInternals.TimerData TimerData}. */ + void onTimer(TimerInternals.TimerData timer) throws Exception; TimerCallback NO_OP = new TimerCallback() { @Override - public void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception { + public void onTimer(TimerInternals.TimerData timer) throws Exception { // Nothing } }; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java index a3a7749..951803a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java @@ -17,11 +17,6 @@ */ package org.apache.beam.sdk.util.state; -import static org.mockito.Matchers.argThat; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.joda.time.Instant; @@ -29,7 +24,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -50,37 +44,6 @@ public class InMemoryTimerInternalsTest { MockitoAnnotations.initMocks(this); } - private static class TimersAre extends ArgumentMatcher<Iterable<TimerData>> { - final List<TimerData> expectedTimers; - TimersAre(List<TimerData> timers) { - expectedTimers = timers; - } - - @Override - public boolean matches(Object actual) { - if (actual == null || !(actual instanceof Iterable)) { - return false; - } - @SuppressWarnings("unchecked") - Iterable<TimerData> timers = (Iterable<TimerData>) actual; - - List<TimerData> actualTimers = new ArrayList(); - for (TimerData timer : timers) { - actualTimers.add(timer); - } - return expectedTimers.equals(actualTimers); - } - - @Override - public String toString() { - return "ordered timers " + expectedTimers.toString(); - } - } - - private static TimersAre timersAre(TimerData... timers) { - return new TimersAre(Arrays.asList(timers)); - } - @Test public void testFiringTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); @@ -91,7 +54,7 @@ public class InMemoryTimerInternalsTest { underTest.setTimer(processingTime2); underTest.advanceProcessingTime(timerCallback, new Instant(20)); - Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1))); + Mockito.verify(timerCallback).onTimer(processingTime1); Mockito.verifyNoMoreInteractions(timerCallback); // Advancing just a little shouldn't refire @@ -100,13 +63,13 @@ public class InMemoryTimerInternalsTest { // Adding the timer and advancing a little should refire underTest.setTimer(processingTime1); - Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1))); + Mockito.verify(timerCallback).onTimer(processingTime1); underTest.advanceProcessingTime(timerCallback, new Instant(21)); Mockito.verifyNoMoreInteractions(timerCallback); // And advancing the rest of the way should still have the other timer underTest.advanceProcessingTime(timerCallback, new Instant(30)); - Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime2))); + Mockito.verify(timerCallback).onTimer(processingTime2); Mockito.verifyNoMoreInteractions(timerCallback); } @@ -124,11 +87,13 @@ public class InMemoryTimerInternalsTest { underTest.setTimer(watermarkTime2); underTest.advanceInputWatermark(timerCallback, new Instant(30)); - Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime1, watermarkTime2))); + Mockito.verify(timerCallback).onTimer(watermarkTime1); + Mockito.verify(timerCallback).onTimer(watermarkTime2); Mockito.verifyNoMoreInteractions(timerCallback); underTest.advanceProcessingTime(timerCallback, new Instant(30)); - Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1, processingTime2))); + Mockito.verify(timerCallback).onTimer(processingTime1); + Mockito.verify(timerCallback).onTimer(processingTime2); Mockito.verifyNoMoreInteractions(timerCallback); } @@ -142,9 +107,10 @@ public class InMemoryTimerInternalsTest { underTest.setTimer(processingTime); underTest.setTimer(processingTime); underTest.advanceProcessingTime(timerCallback, new Instant(20)); - Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime))); underTest.advanceInputWatermark(timerCallback, new Instant(20)); - Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime))); + + Mockito.verify(timerCallback).onTimer(processingTime); + Mockito.verify(timerCallback).onTimer(watermarkTime); Mockito.verifyNoMoreInteractions(timerCallback); } }