Repository: incubator-beam
Updated Branches:
  refs/heads/master 4b682039d -> c8f2cdb22


Revert "Improvements to ReduceFnRunner prefetching"

This reverts commit 4282c67c5fa4dea2fe6c8695e0ea23f383c6457b, which
contained some incompatibilities outside of runners-core.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aaa3b91e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aaa3b91e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aaa3b91e

Branch: refs/heads/master
Commit: aaa3b91e1e7b39dd585314a6017235cdd127e923
Parents: 4b68203
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Nov 30 15:21:53 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Nov 30 15:21:53 2016 -0800

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  11 +-
 .../beam/runners/core/PaneInfoTracker.java      |   4 -
 .../runners/core/ReduceFnContextFactory.java    |   9 +-
 .../beam/runners/core/ReduceFnRunner.java       | 488 +++++++------------
 .../apache/beam/runners/core/WatermarkHold.java |   5 -
 .../triggers/TriggerStateMachineRunner.java     |  14 +-
 .../beam/runners/core/ReduceFnTester.java       |   4 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   5 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   6 +-
 .../sdk/util/state/InMemoryTimerInternals.java  |  22 +-
 .../beam/sdk/util/state/TimerCallback.java      |   9 +-
 .../util/state/InMemoryTimerInternalsTest.java  |  54 +-
 12 files changed, 224 insertions(+), 407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 294f21d..8b10813 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -72,9 +73,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   @Override
   public void processElement(ProcessContext c) throws Exception {
-    KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
+    KeyedWorkItem<K, InputT> element = c.element();
 
-    K key = keyedWorkItem.key();
+    K key = c.element().key();
     TimerInternals timerInternals = c.windowingInternals().timerInternals();
     StateInternals<K> stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
 
@@ -92,8 +93,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
             reduceFn,
             c.getPipelineOptions());
 
-    reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
-    reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
+    reduceFnRunner.processElements(element.elementsIterable());
+    for (TimerData timer : element.timersIterable()) {
+      reduceFnRunner.onTimer(timer);
+    }
     reduceFnRunner.persist();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 69a4cfd..8140243 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -54,10 +54,6 @@ public class PaneInfoTracker {
     state.access(PANE_INFO_TAG).clear();
   }
 
-  public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
-    context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
-  }
-
   /**
    * Return a ({@link ReadableState} for) the pane info appropriate for {@code 
context}. The pane
    * info includes the timing for the pane, who's calculation is quite subtle.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index c5bda9b..539126a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateAccessor;
 import org.apache.beam.sdk.util.state.StateContext;
@@ -116,7 +117,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
   }
 
   public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
-      PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
+      ReadableState<PaneInfo> pane, StateStyle style, 
OnTriggerCallbacks<OutputT> callbacks) {
     return new OnTriggerContextImpl(stateAccessor(window, style), pane, 
callbacks);
   }
 
@@ -388,11 +389,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W 
extends BoundedWindow> {
 
   private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, 
W>.OnTriggerContext {
     private final StateAccessorImpl<K, W> state;
-    private final PaneInfo pane;
+    private final ReadableState<PaneInfo> pane;
     private final OnTriggerCallbacks<OutputT> callbacks;
     private final TimersImpl timers;
 
-    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
+    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, 
ReadableState<PaneInfo> pane,
         OnTriggerCallbacks<OutputT> callbacks) {
       reduceFn.super();
       this.state = state;
@@ -423,7 +424,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
 
     @Override
     public PaneInfo paneInfo() {
-      return pane;
+      return pane.read();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 3a82be9..a686f46 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -21,15 +21,12 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -61,6 +58,7 @@ import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
 import org.apache.beam.sdk.util.state.TimerCallback;
@@ -270,32 +268,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
     return activeWindows.getActiveAndNewWindows().isEmpty();
   }
 
-  private Set<W> openWindows(Collection<W> windows) {
-    Set<W> result = new HashSet<>();
-    for (W window : windows) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext = 
contextFactory.base(
-          window, StateStyle.DIRECT);
-      if (!triggerRunner.isClosed(directContext.state())) {
-        result.add(window);
-      }
-    }
-    return result;
-  }
-
-  private Collection<W> windowsThatShouldFire(Set<W> windows) throws Exception 
{
-    Collection<W> result = new ArrayList<>();
-    // Filter out timers that didn't trigger.
-    for (W window : windows) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext =
-          contextFactory.base(window, StateStyle.DIRECT);
-      if (triggerRunner.shouldFire(
-          directContext.window(), directContext.timers(), 
directContext.state())) {
-        result.add(window);
-      }
-    }
-    return result;
-  }
-
   /**
    * Incorporate {@code values} into the underlying reduce function, and 
manage holds, timers,
    * triggers, and window merging.
@@ -321,54 +293,25 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
    * </ol>
    */
   public void processElements(Iterable<WindowedValue<InputT>> values) throws 
Exception {
-    if (!values.iterator().hasNext()) {
-      return;
-    }
-
-    // Determine all the windows for elements.
-    Set<W> windows = collectWindows(values);
     // If an incoming element introduces a new window, attempt to merge it 
into an existing
     // window eagerly.
-    Map<W, W> windowToMergeResult = mergeWindows(windows);
-    if (!windowToMergeResult.isEmpty()) {
-      // Update windows by removing all windows that were merged away and 
adding
-      // the windows they were merged to. We add after completing all the
-      // removals to avoid removing a window that was also added.
-      List<W> addedWindows = new ArrayList<>(windowToMergeResult.size());
-      for (Map.Entry<W, W> entry : windowToMergeResult.entrySet()) {
-        windows.remove(entry.getKey());
-        addedWindows.add(entry.getValue());
-      }
-      windows.addAll(addedWindows);
-    }
+    Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
 
-    prefetchWindowsForValues(windows);
-
-    // All windows that are open before element processing may need to fire.
-    Set<W> windowsToConsider = openWindows(windows);
+    Set<W> windowsToConsider = new HashSet<>();
 
     // Process each element, using the updated activeWindows determined by 
collectAndMergeWindows.
     for (WindowedValue<InputT> value : values) {
-      processElement(windowToMergeResult, value);
+      windowsToConsider.addAll(processElement(windowToMergeResult, value));
     }
 
-    // Now that we've processed the elements, see if any of the windows need 
to fire.
-    // Prefetch state necessary to determine if the triggers should fire.
+    // Trigger output from any window for which the trigger is ready
     for (W mergedWindow : windowsToConsider) {
-      triggerRunner.prefetchShouldFire(
-          mergedWindow, contextFactory.base(mergedWindow, 
StateStyle.DIRECT).state());
-    }
-    // Filter to windows that are firing.
-    Collection<W> windowsToFire = windowsThatShouldFire(windowsToConsider);
-    // Prefetch windows that are firing.
-    for (W window : windowsToFire) {
-      prefetchEmit(contextFactory.base(window, StateStyle.DIRECT),
-          contextFactory.base(window, StateStyle.RENAMED));
-    }
-    // Trigger output from firing windows.
-    for (W window : windowsToFire) {
-      emit(contextFactory.base(window, StateStyle.DIRECT),
-          contextFactory.base(window, StateStyle.RENAMED));
+      ReduceFn<K, InputT, OutputT, W>.Context directContext =
+          contextFactory.base(mergedWindow, StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+          contextFactory.base(mergedWindow, StateStyle.RENAMED);
+      triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
+      emitIfAppropriate(directContext, renamedContext);
     }
 
     // We're all done with merging and emitting elements so can compress the 
activeWindow state.
@@ -382,61 +325,52 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
   }
 
   /**
-   * Extract the windows associated with the values.
+   * Extract the windows associated with the values, and invoke merge. Return 
a map
+   * from windows to the merge result window. If a window is not in the domain 
of
+   * the result map then it did not get merged into a different window.
    */
-  private Set<W> collectWindows(Iterable<WindowedValue<InputT>> values) throws 
Exception {
-    Set<W> windows = new HashSet<>();
-    for (WindowedValue<?> value : values) {
-      for (BoundedWindow untypedWindow : value.getWindows()) {
-        @SuppressWarnings("unchecked")
-          W window = (W) untypedWindow;
-        windows.add(window);
-      }
-    }
-    return windows;
-  }
-
-  /**
-   * Invoke merge for the given windows and return a map from windows to the
-   * merge result window. Windows that were not merged are not present in the
-   * map.
-   */
-  private Map<W, W> mergeWindows(Set<W> windows) throws Exception {
+  private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> 
values)
+      throws Exception {
+    // No-op if no merging can take place
     if (windowingStrategy.getWindowFn().isNonMerging()) {
-      // Return an empty map, indicating that every window is not merged.
-      return Collections.emptyMap();
+      return ImmutableMap.of();
     }
 
-    Map<W, W> windowToMergeResult = new HashMap<>();
     // Collect the windows from all elements (except those which are too late) 
and
     // make sure they are already in the active window set or are added as NEW 
windows.
-    for (W window : windows) {
-      // For backwards compat with pre 1.4 only.
-      // We may still have ACTIVE windows with multiple state addresses, 
representing
-      // a window who's state has not yet been eagerly merged.
-      // We'll go ahead and merge that state now so that we don't have to 
worry about
-      // this legacy case anywhere else.
-      if (activeWindows.isActive(window)) {
-        Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
-        if (stateAddressWindows.size() > 1) {
-          // This is a legacy window who's state has not been eagerly merged.
-          // Do that now.
-          ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
-              contextFactory.forPremerge(window);
-          reduceFn.onMerge(premergeContext);
-          watermarkHold.onMerge(premergeContext);
-          activeWindows.merged(window);
+    for (WindowedValue<?> value : values) {
+      for (BoundedWindow untypedWindow : value.getWindows()) {
+        @SuppressWarnings("unchecked")
+        W window = (W) untypedWindow;
+
+        // For backwards compat with pre 1.4 only.
+        // We may still have ACTIVE windows with multiple state addresses, 
representing
+        // a window who's state has not yet been eagerly merged.
+        // We'll go ahead and merge that state now so that we don't have to 
worry about
+        // this legacy case anywhere else.
+        if (activeWindows.isActive(window)) {
+          Set<W> stateAddressWindows = 
activeWindows.readStateAddresses(window);
+          if (stateAddressWindows.size() > 1) {
+            // This is a legacy window who's state has not been eagerly merged.
+            // Do that now.
+            ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
+                contextFactory.forPremerge(window);
+            reduceFn.onMerge(premergeContext);
+            watermarkHold.onMerge(premergeContext);
+            activeWindows.merged(window);
+          }
         }
-      }
 
-      // Add this window as NEW if it is not currently ACTIVE.
-      // If we had already seen this window and closed its trigger, then the
-      // window will not be currently ACTIVE. It will then be added as NEW 
here,
-      // and fall into the merging logic as usual.
-      activeWindows.ensureWindowExists(window);
+        // Add this window as NEW if it is not currently ACTIVE.
+        // If we had already seen this window and closed its trigger, then the
+        // window will not be currently ACTIVE. It will then be added as NEW 
here,
+        // and fall into the merging logic as usual.
+        activeWindows.ensureWindowExists(window);
+      }
     }
 
     // Merge all of the active windows and retain a mapping from source 
windows to result windows.
+    Map<W, W> windowToMergeResult = new HashMap<>();
     activeWindows.merge(new OnMergeCallback(windowToMergeResult));
     return windowToMergeResult;
   }
@@ -538,50 +472,38 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
   }
 
   /**
-   * Redirect element windows to the ACTIVE windows they have been merged into.
-   * The compressed representation (value, {window1, window2, ...}) actually 
represents
-   * distinct elements (value, window1), (value, window2), ...
-   * so if window1 and window2 merge, the resulting window will contain both 
copies
-   * of the value.
+   * Process an element.
+   *
+   * @param value the value being processed
+   * @return the set of windows in which the element was actually processed
    */
-  private ImmutableSet<W> toMergedWindows(final Map<W, W> windowToMergeResult,
-      final Collection<? extends BoundedWindow> windows) {
-    return ImmutableSet.copyOf(
-        FluentIterable.from(windows).transform(
-            new Function<BoundedWindow, W>() {
-              @Override
-              public W apply(BoundedWindow untypedWindow) {
-                @SuppressWarnings("unchecked")
-                W window = (W) untypedWindow;
-                W mergedWindow = windowToMergeResult.get(window);
-                // If the element is not present in the map, the window is 
unmerged.
-                return (mergedWindow == null) ? window : mergedWindow;
-              }
-            }
-        ));
-  }
+  private Collection<W> processElement(Map<W, W> windowToMergeResult, 
WindowedValue<InputT> value)
+      throws Exception {
+    // Redirect element windows to the ACTIVE windows they have been merged 
into.
+    // The compressed representation (value, {window1, window2, ...}) actually 
represents
+    // distinct elements (value, window1), (value, window2), ...
+    // so if window1 and window2 merge, the resulting window will contain both 
copies
+    // of the value.
+    Collection<W> windows = new ArrayList<>();
+    for (BoundedWindow untypedWindow : value.getWindows()) {
+      @SuppressWarnings("unchecked")
+      W window = (W) untypedWindow;
+      W mergeResult = windowToMergeResult.get(window);
+      if (mergeResult == null) {
+        mergeResult = window;
+      }
+      windows.add(mergeResult);
+    }
 
-  private void prefetchWindowsForValues(Collection<W> windows) {
     // Prefetch in each of the windows if we're going to need to process 
triggers
     for (W window : windows) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext = 
contextFactory.base(
-          window, StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = 
contextFactory.forValue(
+          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
       triggerRunner.prefetchForValue(window, directContext.state());
     }
-  }
-
-  /**
-   * Process an element.
-   *
-   * @param windowToMergeResult map of windows to merged windows. If a window 
is
-   * not present it is unmerged.
-   * @param value the value being processed
-   */
-  private void processElement(Map<W, W> windowToMergeResult, 
WindowedValue<InputT> value)
-      throws Exception {
-    ImmutableSet<W> windows = toMergedWindows(windowToMergeResult, 
value.getWindows());
 
     // Process the element for each (mergeResultWindow, not closed) window it 
belongs to.
+    List<W> triggerableWindows = new ArrayList<>(windows.size());
     for (W window : windows) {
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = 
contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
@@ -596,6 +518,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
         continue;
       }
 
+      triggerableWindows.add(window);
       activeWindows.ensureWindowIsActive(window);
       ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = 
contextFactory.forValue(
           window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
@@ -639,152 +562,102 @@ public class ReduceFnRunner<K, InputT, OutputT, W 
extends BoundedWindow> impleme
       // cannot take a trigger state from firing to non-firing.
       // (We don't actually assert this since it is too slow.)
     }
+
+    return triggerableWindows;
   }
 
   /**
-   * Enriches TimerData with state necessary for processing a timer as well as
-   * common queries about a timer.
+   * Called when an end-of-window, garbage collection, or trigger-specific 
timer fires.
    */
-  private class EnrichedTimerData {
-    public final Instant timestamp;
-    public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
-    public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
-    // If this is an end-of-window timer then we may need to set a garbage 
collection timer
-    // if allowed lateness is non-zero.
-    public final boolean isEndOfWindow;
-    // If this is a garbage collection timer then we should trigger and
-    // garbage collect the window.  We'll consider any timer at or after the
-    // end-of-window time to be a signal to garbage collect.
-    public final boolean isGarbageCollection;
-
-    EnrichedTimerData(
-        TimerData timer,
-        ReduceFn<K, InputT, OutputT, W>.Context directContext,
-        ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
-      this.timestamp = timer.getTimestamp();
-      this.directContext = directContext;
-      this.renamedContext = renamedContext;
-      W window = directContext.window();
-      this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-          && timer.getTimestamp().equals(window.maxTimestamp());
-      Instant cleanupTime = garbageCollectionTime(window);
-      this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
-    }
+  public void onTimer(TimerData timer) throws Exception {
+    // Which window is the timer for?
+    checkArgument(timer.getNamespace() instanceof WindowNamespace,
+        "Expected timer to be in WindowNamespace, but was in %s", 
timer.getNamespace());
+    @SuppressWarnings("unchecked")
+    WindowNamespace<W> windowNamespace = (WindowNamespace<W>) 
timer.getNamespace();
+    W window = windowNamespace.getWindow();
+    ReduceFn<K, InputT, OutputT, W>.Context directContext =
+        contextFactory.base(window, StateStyle.DIRECT);
+    ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+        contextFactory.base(window, StateStyle.RENAMED);
 
     // Has this window had its trigger finish?
     // - The trigger may implement isClosed as constant false.
     // - If the window function does not support windowing then all windows 
will be considered
     // active.
     // So we must take conjunction of activeWindows and triggerRunner state.
-    public boolean windowIsActiveAndOpen() {
-      return activeWindows.isActive(directContext.window())
-          && !triggerRunner.isClosed(directContext.state());
-    }
-  }
+    boolean windowIsActiveAndOpen =
+        activeWindows.isActive(window) && 
!triggerRunner.isClosed(directContext.state());
 
-  public void onTimers(Iterable<TimerData> timers) throws Exception {
-    if (!timers.iterator().hasNext()) {
-      return;
+    if (!windowIsActiveAndOpen) {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window 
{}", timer, window);
     }
 
-    // Create a reusable context for each timer and begin prefetching necessary
-    // state.
-    List<EnrichedTimerData> enrichedTimers = new LinkedList();
-    for (TimerData timer : timers) {
-      checkArgument(timer.getNamespace() instanceof WindowNamespace,
-          "Expected timer to be in WindowNamespace, but was in %s", 
timer.getNamespace());
-      @SuppressWarnings("unchecked")
-        WindowNamespace<W> windowNamespace = (WindowNamespace<W>) 
timer.getNamespace();
-      W window = windowNamespace.getWindow();
-      ReduceFn<K, InputT, OutputT, W>.Context directContext =
-          contextFactory.base(window, StateStyle.DIRECT);
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
-          contextFactory.base(window, StateStyle.RENAMED);
-      EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, 
directContext, renamedContext);
-      enrichedTimers.add(enrichedTimer);
-
-      // Perform prefetching of state to determine if the trigger should fire.
-      if (enrichedTimer.isGarbageCollection) {
-        triggerRunner.prefetchIsClosed(directContext.state());
-      } else {
-        triggerRunner.prefetchShouldFire(directContext.window(), 
directContext.state());
+    // If this is an end-of-window timer then we may need to set a garbage 
collection timer
+    // if allowed lateness is non-zero.
+    boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+        && timer.getTimestamp().equals(window.maxTimestamp());
+
+    // If this is a garbage collection timer then we should trigger and 
garbage collect the window.
+    // We'll consider any timer at or after the end-of-window time to be a 
signal to garbage
+    // collect.
+    Instant cleanupTime = garbageCollectionTime(window);
+    boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
+        && !timer.getTimestamp().isBefore(cleanupTime);
+
+    if (isGarbageCollection) {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} 
with "
+          + "inputWatermark:{}; outputWatermark:{}",
+          key, window, timer.getTimestamp(), 
timerInternals.currentInputWatermarkTime(),
+          timerInternals.currentOutputWatermarkTime());
+
+      if (windowIsActiveAndOpen) {
+        // We need to call onTrigger to emit the final pane if required.
+        // The final pane *may* be ON_TIME if no prior ON_TIME pane has been 
emitted,
+        // and the watermark has passed the end of the window.
+        @Nullable Instant newHold =
+            onTrigger(directContext, renamedContext, true/* isFinished */, 
isEndOfWindow);
+        checkState(newHold == null,
+            "Hold placed at %s despite isFinished being true.", newHold);
       }
-    }
 
-    // For those windows that are active and open, prefetch the triggering or 
emitting state.
-    for (EnrichedTimerData timer : enrichedTimers) {
-      if (timer.windowIsActiveAndOpen()) {
-        ReduceFn<K, InputT, OutputT, W>.Context directContext = 
timer.directContext;
-        if (timer.isGarbageCollection) {
-          prefetchOnTrigger(directContext, timer.renamedContext);
-        } else if (triggerRunner.shouldFire(
-            directContext.window(), directContext.timers(), 
directContext.state())) {
-          prefetchEmit(directContext, timer.renamedContext);
-        }
+      // Cleanup flavor B: Clear all the remaining state for this window since 
we'll never
+      // see elements for it again.
+      clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
+    } else {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with 
"
+          + "inputWatermark:{}; outputWatermark:{}",
+          key, window, timer.getTimestamp(), 
timerInternals.currentInputWatermarkTime(),
+          timerInternals.currentOutputWatermarkTime());
+      if (windowIsActiveAndOpen) {
+        emitIfAppropriate(directContext, renamedContext);
       }
-    }
-
-    // Perform processing now that everything is prefetched.
-    for (EnrichedTimerData timer : enrichedTimers) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext = 
timer.directContext;
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = 
timer.renamedContext;
 
-      if (timer.isGarbageCollection) {
-        WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; 
window:{} at {} with "
-                + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), timer.timestamp,
-            timerInternals.currentInputWatermarkTime(),
-            timerInternals.currentOutputWatermarkTime());
-
-        boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
-        if (windowIsActiveAndOpen) {
-          // We need to call onTrigger to emit the final pane if required.
-          // The final pane *may* be ON_TIME if no prior ON_TIME pane has been 
emitted,
-          // and the watermark has passed the end of the window.
-          @Nullable
-          Instant newHold = onTrigger(
-              directContext, renamedContext, true /* isFinished */, 
timer.isEndOfWindow);
-          checkState(newHold == null, "Hold placed at %s despite isFinished 
being true.", newHold);
-        }
-
-        // Cleanup flavor B: Clear all the remaining state for this window 
since we'll never
-        // see elements for it again.
-        clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
-      } else {
-        WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; 
window:{} at {} with "
-                + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), timer.timestamp,
-            timerInternals.currentInputWatermarkTime(),
+      if (isEndOfWindow) {
+        // If the window strategy trigger includes a watermark trigger then at 
this point
+        // there should be no data holds, either because we'd already cleared 
them on an
+        // earlier onTrigger, or because we just cleared them on the above 
emitIfAppropriate.
+        // We could assert this but it is very expensive.
+
+        // Since we are processing an on-time firing we should schedule the 
garbage collection
+        // timer. (If getAllowedLateness is zero then the timer event will be 
considered a
+        // cleanup event and handled by the above).
+        // Note we must do this even if the trigger is finished so that we are 
sure to cleanup
+        // any final trigger finished bits.
+        checkState(
+            windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+            "Unexpected zero getAllowedLateness");
+        WindowTracing.debug(
+            "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; 
window:{} at {} with "
+            + "inputWatermark:{}; outputWatermark:{}",
+            key, directContext.window(), cleanupTime, 
timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
-        if (timer.windowIsActiveAndOpen()
-            && triggerRunner.shouldFire(
-                   directContext.window(), directContext.timers(), 
directContext.state())) {
-          emit(directContext, renamedContext);
-        }
-
-        if (timer.isEndOfWindow) {
-          // If the window strategy trigger includes a watermark trigger then 
at this point
-          // there should be no data holds, either because we'd already 
cleared them on an
-          // earlier onTrigger, or because we just cleared them on the above 
emit.
-          // We could assert this but it is very expensive.
-
-          // Since we are processing an on-time firing we should schedule the 
garbage collection
-          // timer. (If getAllowedLateness is zero then the timer event will 
be considered a
-          // cleanup event and handled by the above).
-          // Note we must do this even if the trigger is finished so that we 
are sure to cleanup
-          // any final trigger finished bits.
-          
checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
-              "Unexpected zero getAllowedLateness");
-          Instant cleanupTime = garbageCollectionTime(directContext.window());
-          WindowTracing.debug(
-              "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; 
window:{} at {} with "
-                  + "inputWatermark:{}; outputWatermark:{}",
-              key, directContext.window(), cleanupTime, 
timerInternals.currentInputWatermarkTime(),
-              timerInternals.currentOutputWatermarkTime());
-          checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-              "Cleanup time %s is beyond end-of-time", cleanupTime);
-          directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
-        }
+        checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                                 "Cleanup time %s is beyond end-of-time", 
cleanupTime);
+        directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
       }
     }
   }
@@ -793,7 +666,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
    * Clear all the state associated with {@code context}'s window.
    * Should only be invoked if we know all future elements for this window 
will be considered
    * beyond allowed lateness.
-   * This is a superset of the clearing done by {@link #emitPane} below since:
+   * This is a superset of the clearing done by {@link #emitIfAppropriate} 
below since:
    * <ol>
    * <li>We can clear the trigger finished bits since we'll never need to ask 
if the trigger is
    * closed again.
@@ -819,10 +692,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
     } else {
       // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or 
triggerRunner.isClosed (2).
       // For (1), if !activeWindows.isActive then the window must be merging 
and has been
-      // explicitly removed by emit. But in that case the trigger must have 
fired
+      // explicitly removed by emitIfAppropriate. But in that case the trigger 
must have fired
       // and been closed, so this case reduces to (2).
       // For (2), if triggerRunner.isClosed then the trigger was fired and 
entered the
-      // closed state. In that case emit will have cleared all state in
+      // closed state. In that case emitIfAppropriate will have cleared all 
state in
       // reduceFn, triggerRunner (except for finished bits), paneInfoTracker 
and activeWindows.
       // We also know nonEmptyPanes must have been unconditionally cleared by 
the trigger.
       // Since the trigger fired the existing watermark holds must have been 
cleared, and since
@@ -864,23 +737,17 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
     return false;
   }
 
-  private void prefetchEmit(ReduceFn<K, InputT, OutputT, W>.Context 
directContext,
-                                ReduceFn<K, InputT, OutputT, W>.Context 
renamedContext) {
-    triggerRunner.prefetchShouldFire(directContext.window(), 
directContext.state());
-    triggerRunner.prefetchOnFire(directContext.window(), 
directContext.state());
-    triggerRunner.prefetchIsClosed(directContext.state());
-    prefetchOnTrigger(directContext, renamedContext);
-  }
-
   /**
-   * Emit if a trigger is ready to fire or timers require it, and cleanup 
state.
+   * Possibly emit a pane if a trigger is ready to fire or timers require it, 
and cleanup state.
    */
-  private void emit(
-      ReduceFn<K, InputT, OutputT, W>.Context directContext,
+  private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context 
directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
       throws Exception {
-    checkState(triggerRunner.shouldFire(
-        directContext.window(), directContext.timers(), 
directContext.state()));
+    if (!triggerRunner.shouldFire(
+        directContext.window(), directContext.timers(), 
directContext.state())) {
+      // Ignore unless trigger is ready to fire
+      return;
+    }
 
     // Inform the trigger of the transition to see if it is finished
     triggerRunner.onFire(directContext.window(), directContext.timers(), 
directContext.state());
@@ -915,7 +782,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
   }
 
   /**
-   * Do we need to emit?
+   * Do we need to emit a pane?
    */
   private boolean needToEmit(boolean isEmpty, boolean isFinished, 
PaneInfo.Timing timing) {
     if (!isEmpty) {
@@ -933,15 +800,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
     return false;
   }
 
-  private void prefetchOnTrigger(
-      final ReduceFn<K, InputT, OutputT, W>.Context directContext,
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
-    paneInfoTracker.prefetchPaneInfo(directContext);
-    watermarkHold.prefetchExtract(renamedContext);
-    nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
-    reduceFn.prefetchOnTrigger(directContext.state());
-  }
-
   /**
    * Run the {@link ReduceFn#onTrigger} method and produce any necessary 
output.
    *
@@ -955,17 +813,25 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
           throws Exception {
     Instant inputWM = timerInternals.currentInputWatermarkTime();
 
-    // Calculate the pane info.
-    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, 
isFinished).read();
+    // Prefetch necessary states
+    ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
+        watermarkHold.extractAndRelease(renamedContext, 
isFinished).readLater();
+    ReadableState<PaneInfo> paneFuture =
+        paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
+    ReadableState<Boolean> isEmptyFuture =
+        nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
 
+    reduceFn.prefetchOnTrigger(directContext.state());
+    triggerRunner.prefetchOnFire(directContext.window(), 
directContext.state());
+
+    // Calculate the pane info.
+    final PaneInfo pane = paneFuture.read();
     // Extract the window hold, and as a side effect clear it.
-    final WatermarkHold.OldAndNewHolds pair =
-        watermarkHold.extractAndRelease(renamedContext, isFinished).read();
+
+    WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
     final Instant outputTimestamp = pair.oldHold;
     @Nullable Instant newHold = pair.newHold;
 
-    final boolean isEmpty = 
nonEmptyPanes.isEmpty(renamedContext.state()).read();
-
     if (newHold != null) {
       // We can't be finished yet.
       checkState(
@@ -997,11 +863,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> impleme
     }
 
     // Only emit a pane if it has data or empty panes are observable.
-    if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
+    if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
       // Run reduceFn.onTrigger method.
       final List<W> windows = 
Collections.singletonList(directContext.window());
       ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
-          contextFactory.forTrigger(directContext.window(), pane, 
StateStyle.RENAMED,
+          contextFactory.forTrigger(directContext.window(), paneFuture, 
StateStyle.RENAMED,
               new OnTriggerCallbacks<OutputT>() {
                 @Override
                 public void output(OutputT toOutput) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 7f1afcc..3c04571 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -444,11 +444,6 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
     }
   }
 
-  public void prefetchExtract(final ReduceFn<?, ?, ?, W>.Context context) {
-    context.state().access(elementHoldTag).readLater();
-    context.state().access(EXTRA_HOLD_TAG).readLater();
-  }
-
   /**
    * Return (a future for) the earliest hold for {@code context}. Clear all 
the holds after
    * reading, but add/restore an end-of-window or garbage collection hold if 
required.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 2f277eb..9f03216 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -99,25 +99,25 @@ public class TriggerStateMachineRunner<W extends 
BoundedWindow> {
     return 
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
   }
 
-  public void prefetchIsClosed(StateAccessor<?> state) {
+  public void prefetchForValue(W window, StateAccessor<?> state) {
     if (isFinishedSetNeeded()) {
       state.access(FINISHED_BITS_TAG).readLater();
     }
-  }
-
-  public void prefetchForValue(W window, StateAccessor<?> state) {
-    prefetchIsClosed(state);
     rootTrigger.getSpec().prefetchOnElement(
         contextFactory.createStateAccessor(window, rootTrigger));
   }
 
   public void prefetchOnFire(W window, StateAccessor<?> state) {
-    prefetchIsClosed(state);
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
     
rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, 
rootTrigger));
   }
 
   public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    prefetchIsClosed(state);
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
     rootTrigger.getSpec().prefetchShouldFire(
         contextFactory.createStateAccessor(window, rootTrigger));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 8be8ae5..337be23 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -509,10 +509,8 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
 
   public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws 
Exception {
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    ArrayList timers = new ArrayList(1);
-    timers.add(
+    runner.onTimer(
         TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), 
timestamp, domain));
-    runner.onTimers(timers);
     runner.persist();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index f70fb94..9d25bc6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -200,7 +201,9 @@ class GroupAlsoByWindowEvaluatorFactory implements 
TransformEvaluatorFactory {
       // Drop any elements within expired windows
       reduceFnRunner.processElements(
           dropExpiredWindows(key, workItem.elementsIterable(), 
timerInternals));
-      reduceFnRunner.onTimers(workItem.timersIterable());
+      for (TimerData timer : workItem.timersIterable()) {
+        reduceFnRunner.onTimer(timer);
+      }
       reduceFnRunner.persist();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index f8b1222..daa8a06 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -466,10 +466,8 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
   private static TimerCallback collectInto(final 
List<TimerInternals.TimerData> firedTimers) {
     return new TimerCallback() {
       @Override
-      public void onTimers(Iterable<TimerInternals.TimerData> timers) throws 
Exception {
-        for (TimerInternals.TimerData timer : timers) {
-          firedTimers.add(timer);
-        }
+      public void onTimer(TimerInternals.TimerData timer) throws Exception {
+        firedTimers.add(timer);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
index f1ddaac..a3bb45a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -21,7 +21,6 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.MoreObjects;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.PriorityQueue;
 import java.util.Set;
@@ -236,20 +235,13 @@ public class InMemoryTimerInternals implements 
TimerInternals {
       throws Exception {
     checkNotNull(timerCallback);
     PriorityQueue<TimerData> queue = queue(domain);
-    while (true) {
-      ArrayList<TimerData> firedTimers = new ArrayList();
-      while (!queue.isEmpty() && 
currentTime.isAfter(queue.peek().getTimestamp())) {
-        // Remove before firing, so that if the callback adds another identical
-        // timer we don't remove it.
-        TimerData timer = queue.remove();
-        firedTimers.add(timer);
-        WindowTracing.trace(
-            "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, 
currentTime);
-      }
-      if (firedTimers.isEmpty()) {
-        break;
-      }
-      timerCallback.onTimers(firedTimers);
+    while (!queue.isEmpty() && 
currentTime.isAfter(queue.peek().getTimestamp())) {
+      // Remove before firing, so that if the callback adds another identical
+      // timer we don't remove it.
+      TimerData timer = queue.remove();
+      WindowTracing.trace(
+          "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, 
currentTime);
+      timerCallback.onTimer(timer);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
index dfdfd5b..6598e30 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
@@ -19,17 +19,16 @@ package org.apache.beam.sdk.util.state;
 
 import org.apache.beam.sdk.util.TimerInternals;
 
-
 /**
- * A callback that processes an Iterable of {@link TimerInternals.TimerData 
TimerData}.
+ * A callback that processes a {@link TimerInternals.TimerData TimerData}.
  */
 public interface TimerCallback {
-  /** Processes an Iterable of {@link TimerInternals.TimerData TimerData}. */
-  void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception;
+  /** Processes the {@link TimerInternals.TimerData TimerData}. */
+  void onTimer(TimerInternals.TimerData timer) throws Exception;
 
   TimerCallback NO_OP = new TimerCallback() {
     @Override
-    public void onTimers(Iterable<TimerInternals.TimerData> timers) throws 
Exception {
+    public void onTimer(TimerInternals.TimerData timer) throws Exception {
       // Nothing
     }
   };

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
index a3a7749..951803a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
@@ -17,11 +17,6 @@
  */
 package org.apache.beam.sdk.util.state;
 
-import static org.mockito.Matchers.argThat;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.joda.time.Instant;
@@ -29,7 +24,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -50,37 +44,6 @@ public class InMemoryTimerInternalsTest {
     MockitoAnnotations.initMocks(this);
   }
 
-  private static class TimersAre extends ArgumentMatcher<Iterable<TimerData>> {
-    final List<TimerData> expectedTimers;
-    TimersAre(List<TimerData> timers) {
-      expectedTimers = timers;
-    }
-
-    @Override
-    public boolean matches(Object actual) {
-      if (actual == null || !(actual instanceof Iterable)) {
-        return false;
-      }
-      @SuppressWarnings("unchecked")
-      Iterable<TimerData> timers = (Iterable<TimerData>) actual;
-
-      List<TimerData> actualTimers = new ArrayList();
-      for (TimerData timer : timers) {
-        actualTimers.add(timer);
-      }
-      return expectedTimers.equals(actualTimers);
-    }
-
-    @Override
-    public String toString() {
-      return "ordered timers " + expectedTimers.toString();
-    }
-  }
-
-  private static TimersAre timersAre(TimerData... timers) {
-    return new TimersAre(Arrays.asList(timers));
-  }
-
   @Test
   public void testFiringTimers() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
@@ -91,7 +54,7 @@ public class InMemoryTimerInternalsTest {
     underTest.setTimer(processingTime2);
 
     underTest.advanceProcessingTime(timerCallback, new Instant(20));
-    
Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1)));
+    Mockito.verify(timerCallback).onTimer(processingTime1);
     Mockito.verifyNoMoreInteractions(timerCallback);
 
     // Advancing just a little shouldn't refire
@@ -100,13 +63,13 @@ public class InMemoryTimerInternalsTest {
 
     // Adding the timer and advancing a little should refire
     underTest.setTimer(processingTime1);
-    
Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1)));
+    Mockito.verify(timerCallback).onTimer(processingTime1);
     underTest.advanceProcessingTime(timerCallback, new Instant(21));
     Mockito.verifyNoMoreInteractions(timerCallback);
 
     // And advancing the rest of the way should still have the other timer
     underTest.advanceProcessingTime(timerCallback, new Instant(30));
-    
Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime2)));
+    Mockito.verify(timerCallback).onTimer(processingTime2);
     Mockito.verifyNoMoreInteractions(timerCallback);
   }
 
@@ -124,11 +87,13 @@ public class InMemoryTimerInternalsTest {
     underTest.setTimer(watermarkTime2);
 
     underTest.advanceInputWatermark(timerCallback, new Instant(30));
-    Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime1, 
watermarkTime2)));
+    Mockito.verify(timerCallback).onTimer(watermarkTime1);
+    Mockito.verify(timerCallback).onTimer(watermarkTime2);
     Mockito.verifyNoMoreInteractions(timerCallback);
 
     underTest.advanceProcessingTime(timerCallback, new Instant(30));
-    Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1, 
processingTime2)));
+    Mockito.verify(timerCallback).onTimer(processingTime1);
+    Mockito.verify(timerCallback).onTimer(processingTime2);
     Mockito.verifyNoMoreInteractions(timerCallback);
   }
 
@@ -142,9 +107,10 @@ public class InMemoryTimerInternalsTest {
     underTest.setTimer(processingTime);
     underTest.setTimer(processingTime);
     underTest.advanceProcessingTime(timerCallback, new Instant(20));
-    Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime)));
     underTest.advanceInputWatermark(timerCallback, new Instant(20));
-    Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime)));
+
+    Mockito.verify(timerCallback).onTimer(processingTime);
+    Mockito.verify(timerCallback).onTimer(watermarkTime);
     Mockito.verifyNoMoreInteractions(timerCallback);
   }
 }

Reply via email to