http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..b1442dd
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not 
ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+public class PushbackSideInputDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT> {
+  private final DoFnRunner<InputT, OutputT> underlying;
+  private final Collection<PCollectionView<?>> views;
+  private final ReadyCheckingSideInputReader sideInputReader;
+
+  private Set<BoundedWindow> notReadyWindows;
+
+  public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> 
create(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    return new PushbackSideInputDoFnRunner<>(underlying, views, 
sideInputReader);
+  }
+
+  private PushbackSideInputDoFnRunner(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    this.underlying = underlying;
+    this.views = views;
+    this.sideInputReader = sideInputReader;
+  }
+
+  @Override
+  public void startBundle() {
+    notReadyWindows = new HashSet<>();
+    underlying.startBundle();
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for 
the provided element
+   * for each window the element is in that is ready.
+   *
+   * @param elem the element to process in all ready windows
+   * @return each element that could not be processed because it requires a 
side input window
+   * that is not ready.
+   */
+  public Iterable<WindowedValue<InputT>> 
processElementInReadyWindows(WindowedValue<InputT> elem) {
+    if (views.isEmpty()) {
+      processElement(elem);
+      return Collections.emptyList();
+    }
+    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = 
ImmutableList.builder();
+    for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+      BoundedWindow mainInputWindow = 
Iterables.getOnlyElement(windowElem.getWindows());
+      boolean isReady = !notReadyWindows.contains(mainInputWindow);
+      for (PCollectionView<?> view : views) {
+        BoundedWindow sideInputWindow =
+            view.getWindowingStrategyInternal()
+                .getWindowFn()
+                .getSideInputWindow(mainInputWindow);
+        if (!sideInputReader.isReady(view, sideInputWindow)) {
+          isReady = false;
+          break;
+        }
+      }
+      if (isReady) {
+        processElement(windowElem);
+      } else {
+        notReadyWindows.add(mainInputWindow);
+        pushedBack.add(windowElem);
+      }
+    }
+    return pushedBack.build();
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    underlying.processElement(elem);
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#finishBundle()}.
+   */
+  @Override
+  public void finishBundle() {
+    notReadyWindows = null;
+    underlying.finishBundle();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
new file mode 100644
index 0000000..c5ee1e1
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+
+/**
+ * Specification for processing to happen after elements have been grouped by 
key.
+ *
+ * @param <K> The type of key being processed.
+ * @param <InputT> The type of input values associated with the key.
+ * @param <OutputT> The output type that will be produced for each key.
+ * @param <W> The type of windows this operates on.
+ */
+public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow>
+    implements Serializable {
+
+  /** Information accessible to all the processing methods in this {@code 
ReduceFn}. */
+  public abstract class Context {
+    /** Return the key that is being processed. */
+    public abstract K key();
+
+    /** The window that is being processed. */
+    public abstract W window();
+
+    /** Access the current {@link WindowingStrategy}. */
+    public abstract WindowingStrategy<?, W> windowingStrategy();
+
+    /** Return the interface for accessing state. */
+    public abstract StateAccessor<K> state();
+
+    /** Return the interface for accessing timers. */
+    public abstract Timers timers();
+  }
+
+  /** Information accessible within {@link #processValue}. */
+  public abstract class ProcessValueContext extends Context {
+    /** Return the actual value being processed. */
+    public abstract InputT value();
+
+    /** Return the timestamp associated with the value. */
+    public abstract Instant timestamp();
+  }
+
+  /** Information accessible within {@link #onMerge}. */
+  public abstract class OnMergeContext extends Context {
+    /** Return the interface for accessing state. */
+    @Override
+    public abstract MergingStateAccessor<K, W> state();
+  }
+
+  /** Information accessible within {@link #onTrigger}. */
+  public abstract class OnTriggerContext extends Context {
+    /** Returns the {@link PaneInfo} for the trigger firing being processed. */
+    public abstract PaneInfo paneInfo();
+
+    /** Output the given value in the current window. */
+    public abstract void output(OutputT value);
+  }
+
+  
//////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Called for each value of type {@code InputT} associated with the current 
key.
+   */
+  public abstract void processValue(ProcessValueContext c) throws Exception;
+
+  /**
+   * Called when windows are merged.
+   */
+  public abstract void onMerge(OnMergeContext context) throws Exception;
+
+  /**
+   * Called when triggers fire.
+   *
+   * <p>Implementations of {@link ReduceFn} should call {@link 
OnTriggerContext#output} to emit
+   * any results that should be included in the pane produced by this trigger 
firing.
+   */
+  public abstract void onTrigger(OnTriggerContext context) throws Exception;
+
+  /**
+   * Called before {@link #onMerge} is invoked to provide an opportunity to 
prefetch any needed
+   * state.
+   *
+   * @param c Context to use prefetch from.
+   */
+  public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {}
+
+  /**
+   * Called before {@link #onTrigger} is invoked to provide an opportunity to 
prefetch any needed
+   * state.
+   *
+   * @param context Context to use prefetch from.
+   */
+  public void prefetchOnTrigger(StateAccessor<K> context) {}
+
+  /**
+   * Called to clear any persisted state that the {@link ReduceFn} may be 
holding. This will be
+   * called when the windowing is closing and will receive no future 
interactions.
+   */
+  public abstract void clearState(Context context) throws Exception;
+
+  /**
+   * Returns true if the there is no buffered state.
+   */
+  public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
new file mode 100644
index 0000000..c90940e
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.util.state.StateTag;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * Factory for creating instances of the various {@link ReduceFn} contexts.
+ */
+class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
+  public interface OnTriggerCallbacks<OutputT> {
+    void output(OutputT toOutput);
+  }
+
+  private final K key;
+  private final ReduceFn<K, InputT, OutputT, W> reduceFn;
+  private final WindowingStrategy<?, W> windowingStrategy;
+  private final StateInternals<K> stateInternals;
+  private final ActiveWindowSet<W> activeWindows;
+  private final TimerInternals timerInternals;
+  private final WindowingInternals<?, ?> windowingInternals;
+  private final PipelineOptions options;
+
+  ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
+      WindowingStrategy<?, W> windowingStrategy, StateInternals<K> 
stateInternals,
+      ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
+      WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
+    this.key = key;
+    this.reduceFn = reduceFn;
+    this.windowingStrategy = windowingStrategy;
+    this.stateInternals = stateInternals;
+    this.activeWindows = activeWindows;
+    this.timerInternals = timerInternals;
+    this.windowingInternals = windowingInternals;
+    this.options = options;
+  }
+
+  /** Where should we look for state associated with a given window? */
+  public static enum StateStyle {
+    /** All state is associated with the window itself. */
+    DIRECT,
+    /** State is associated with the 'state address' windows tracked by the 
active window set. */
+    RENAMED
+  }
+
+  private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
+    return new StateAccessorImpl<K, W>(
+        activeWindows, windowingStrategy.getWindowFn().windowCoder(),
+        stateInternals, StateContexts.createFromComponents(options, 
windowingInternals, window),
+        style);
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle 
style) {
+    return new ContextImpl(stateAccessor(window, style));
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
+      W window, InputT value, Instant timestamp, StateStyle style) {
+    return new ProcessValueContextImpl(stateAccessor(window, style), value, 
timestamp);
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
+      ReadableState<PaneInfo> pane, StateStyle style, 
OnTriggerCallbacks<OutputT> callbacks) {
+    return new OnTriggerContextImpl(stateAccessor(window, style), pane, 
callbacks);
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
+      Collection<W> activeToBeMerged, W mergeResult, StateStyle style) {
+    return new OnMergeContextImpl(
+        new MergingStateAccessorImpl<K, W>(activeWindows,
+            windowingStrategy.getWindowFn().windowCoder(),
+            stateInternals, style, activeToBeMerged, mergeResult));
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) {
+    return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>(
+        activeWindows, windowingStrategy.getWindowFn().windowCoder(), 
stateInternals, window));
+  }
+
+  private class TimersImpl implements Timers {
+    private final StateNamespace namespace;
+
+    public TimersImpl(StateNamespace namespace) {
+      Preconditions.checkArgument(namespace instanceof WindowNamespace);
+      this.namespace = namespace;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, 
timeDomain));
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timerInternals.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timerInternals.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    public Instant currentEventTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+  }
+
+  // ======================================================================
+  // StateAccessors
+  // ======================================================================
+  static class StateAccessorImpl<K, W extends BoundedWindow> implements 
StateAccessor<K> {
+
+
+    protected final ActiveWindowSet<W> activeWindows;
+    protected final StateContext<W> context;
+    protected final StateNamespace windowNamespace;
+    protected final Coder<W> windowCoder;
+    protected final StateInternals<K> stateInternals;
+    protected final StateStyle style;
+
+    public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> 
windowCoder,
+        StateInternals<K> stateInternals, StateContext<W> context, StateStyle 
style) {
+
+      this.activeWindows = activeWindows;
+      this.windowCoder = windowCoder;
+      this.stateInternals = stateInternals;
+      this.context = checkNotNull(context);
+      this.windowNamespace = namespaceFor(context.window());
+      this.style = style;
+    }
+
+    protected StateNamespace namespaceFor(W window) {
+      return StateNamespaces.window(windowCoder, window);
+    }
+
+    protected StateNamespace windowNamespace() {
+      return windowNamespace;
+    }
+
+    W window() {
+      return context.window();
+    }
+
+    StateNamespace namespace() {
+      return windowNamespace();
+    }
+
+    @Override
+    public <StateT extends State> StateT access(StateTag<? super K, StateT> 
address) {
+      switch (style) {
+        case DIRECT:
+          return stateInternals.state(windowNamespace(), address, context);
+        case RENAMED:
+          return stateInternals.state(
+              namespaceFor(activeWindows.writeStateAddress(context.window())), 
address, context);
+      }
+      throw new RuntimeException(); // cases are exhaustive.
+    }
+  }
+
+  static class MergingStateAccessorImpl<K, W extends BoundedWindow>
+      extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
+    private final Collection<W> activeToBeMerged;
+
+    public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> 
windowCoder,
+        StateInternals<K> stateInternals, StateStyle style, Collection<W> 
activeToBeMerged,
+        W mergeResult) {
+      super(activeWindows, windowCoder, stateInternals,
+          StateContexts.windowOnly(mergeResult), style);
+      this.activeToBeMerged = activeToBeMerged;
+    }
+
+    @Override
+    public <StateT extends State> StateT access(StateTag<? super K, StateT> 
address) {
+      switch (style) {
+        case DIRECT:
+          return stateInternals.state(windowNamespace(), address, context);
+        case RENAMED:
+          return stateInternals.state(
+              namespaceFor(activeWindows.mergedWriteStateAddress(
+                  activeToBeMerged, context.window())),
+              address,
+              context);
+      }
+      throw new RuntimeException(); // cases are exhaustive.
+    }
+
+    @Override
+    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
+        StateTag<? super K, StateT> address) {
+      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
+      for (W mergingWindow : activeToBeMerged) {
+        StateNamespace namespace = null;
+        switch (style) {
+          case DIRECT:
+            namespace = namespaceFor(mergingWindow);
+            break;
+          case RENAMED:
+            namespace = 
namespaceFor(activeWindows.writeStateAddress(mergingWindow));
+            break;
+        }
+        Preconditions.checkNotNull(namespace); // cases are exhaustive.
+        builder.put(mergingWindow, stateInternals.state(namespace, address, 
context));
+      }
+      return builder.build();
+    }
+  }
+
+  static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
+      extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
+    public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, 
Coder<W> windowCoder,
+        StateInternals<K> stateInternals, W window) {
+      super(activeWindows, windowCoder, stateInternals,
+          StateContexts.windowOnly(window), StateStyle.RENAMED);
+    }
+
+    Collection<W> mergingWindows() {
+      return activeWindows.readStateAddresses(context.window());
+    }
+
+    @Override
+    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
+        StateTag<? super K, StateT> address) {
+      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
+      for (W stateAddressWindow : 
activeWindows.readStateAddresses(context.window())) {
+        StateT stateForWindow =
+            stateInternals.state(namespaceFor(stateAddressWindow), address, 
context);
+        builder.put(stateAddressWindow, stateForWindow);
+      }
+      return builder.build();
+    }
+  }
+
+  // ======================================================================
+  // Contexts
+  // ======================================================================
+
+  private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context {
+    private final StateAccessorImpl<K, W> state;
+    private final TimersImpl timers;
+
+    private ContextImpl(StateAccessorImpl<K, W> state) {
+      reduceFn.super();
+      this.state = state;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public StateAccessor<K> state() {
+      return state;
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+
+  private class ProcessValueContextImpl
+      extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext {
+    private final InputT value;
+    private final Instant timestamp;
+    private final StateAccessorImpl<K, W> state;
+    private final TimersImpl timers;
+
+    private ProcessValueContextImpl(StateAccessorImpl<K, W> state,
+        InputT value, Instant timestamp) {
+      reduceFn.super();
+      this.state = state;
+      this.value = value;
+      this.timestamp = timestamp;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public StateAccessor<K> state() {
+      return state;
+    }
+
+    @Override
+    public InputT value() {
+      return value;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+
+  private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, 
W>.OnTriggerContext {
+    private final StateAccessorImpl<K, W> state;
+    private final ReadableState<PaneInfo> pane;
+    private final OnTriggerCallbacks<OutputT> callbacks;
+    private final TimersImpl timers;
+
+    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, 
ReadableState<PaneInfo> pane,
+        OnTriggerCallbacks<OutputT> callbacks) {
+      reduceFn.super();
+      this.state = state;
+      this.pane = pane;
+      this.callbacks = callbacks;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public StateAccessor<K> state() {
+      return state;
+    }
+
+    @Override
+    public PaneInfo paneInfo() {
+      return pane.read();
+    }
+
+    @Override
+    public void output(OutputT value) {
+      callbacks.output(value);
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+
+  private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, 
W>.OnMergeContext {
+    private final MergingStateAccessorImpl<K, W> state;
+    private final TimersImpl timers;
+
+    private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) {
+      reduceFn.super();
+      this.state = state;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public MergingStateAccessor<K, W> state() {
+      return state;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+
+  private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, 
W>.OnMergeContext {
+    private final PremergingStateAccessorImpl<K, W> state;
+    private final TimersImpl timers;
+
+    private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) {
+      reduceFn.super();
+      this.state = state;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public MergingStateAccessor<K, W> state() {
+      return state;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
new file mode 100644
index 0000000..864e8e7
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -0,0 +1,985 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks;
+import org.apache.beam.sdk.util.ReduceFnContextFactory.StateStyle;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+/**
+ * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} 
has partitioned the
+ * {@link PCollection} by key.
+ *
+ * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the 
execution of
+ * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
+ *
+ * <ul>
+ * <li>Tracking the windows that are active (have buffered data) as elements 
arrive and
+ * triggers are fired.
+ * <li>Holding the watermark based on the timestamps of elements in a pane and 
releasing it
+ * when the trigger fires.
+ * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger 
execution, timer
+ * firings, etc, and providing appropriate contexts to the {@link ReduceFn} 
for actions
+ * such as output.
+ * <li>Scheduling garbage collection of state associated with a specific 
window, and making that
+ * happen when the appropriate timer fires.
+ * </ul>
+ *
+ * @param <K>       The type of key being processed.
+ * @param <InputT>  The type of values associated with the key.
+ * @param <OutputT> The output type that will be produced for each key.
+ * @param <W>       The type of windows this operates on.
+ */
+public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
+
+  /**
+   * The {@link ReduceFnRunner} depends on most aspects of the {@link 
WindowingStrategy}.
+   *
+   * <ul>
+   * <li>It runs the trigger from the {@link WindowingStrategy}.</li>
+   * <li>It merges windows according to the {@link WindowingStrategy}.</li>
+   * <li>It chooses how to track active windows and clear out expired windows
+   * according to the {@link WindowingStrategy}, based on the allowed lateness 
and
+   * whether windows can merge.</li>
+   * <li>It decides whether to emit empty final panes according to whether the
+   * {@link WindowingStrategy} requires it.<li>
+   * <li>It uses discarding or accumulation mode according to the {@link 
WindowingStrategy}.</li>
+   * </ul>
+   */
+  private final WindowingStrategy<Object, W> windowingStrategy;
+
+  private final OutputWindowedValue<KV<K, OutputT>> outputter;
+
+  private final StateInternals<K> stateInternals;
+
+  private final Aggregator<Long, Long> droppedDueToClosedWindow;
+
+  private final K key;
+
+  /**
+   * Track which windows are still active and the 'state address' windows 
which hold their state.
+   *
+   * <ul>
+   * <li>State: Global map for all active windows for this computation and key.
+   * <li>Lifetime: Cleared when no active windows need to be tracked. A window 
lives within
+   * the active window set until its trigger is closed or the window is 
garbage collected.
+   * </ul>
+   */
+  private final ActiveWindowSet<W> activeWindows;
+
+  /**
+   * Always a {@link SystemReduceFn}.
+   *
+   * <ul>
+   * <li>State: A bag of accumulated values, or the intermediate result of a 
combiner.
+   * <li>State style: RENAMED
+   * <li>Merging: Concatenate or otherwise combine the state from each merged 
window.
+   * <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. 
Otherwise cleared
+   * when trigger is finished or when the window is garbage collected.
+   * </ul>
+   */
+  private final ReduceFn<K, InputT, OutputT, W> reduceFn;
+
+  /**
+   * Manage the setting and firing of timer events.
+   *
+   * <ul>
+   * <li>Merging: End-of-window and garbage collection timers are cancelled 
when windows are
+   * merged away. Timers created by triggers are never garbage collected and 
are left to
+   * fire and be ignored.
+   * <li>Lifetime: Timers automatically disappear after they fire.
+   * </ul>
+   */
+  private final TimerInternals timerInternals;
+
+  /**
+   * Manage the execution and state for triggers.
+   *
+   * <ul>
+   * <li>State: Tracks which sub-triggers have finished, and any additional 
state needed to
+   * determine when the trigger should fire.
+   * <li>State style: DIRECT
+   * <li>Merging: Finished bits are explicitly managed. Other state is eagerly 
merged as
+   * needed.
+   * <li>Lifetime: Most trigger state is cleared when the final pane is 
emitted. However
+   * the finished bits are left behind and must be cleared when the window is
+   * garbage collected.
+   * </ul>
+   */
+  private final TriggerRunner<W> triggerRunner;
+
+  /**
+   * Store the output watermark holds for each window.
+   *
+   * <ul>
+   * <li>State: Bag of hold timestamps.
+   * <li>State style: RENAMED
+   * <li>Merging: Depending on {@link OutputTimeFn}, may need to be 
recalculated on merging.
+   * When a pane fires it may be necessary to add (back) an end-of-window or 
garbage collection
+   * hold.
+   * <li>Lifetime: Cleared when a pane fires or when the window is garbage 
collected.
+   * </ul>
+   */
+  private final WatermarkHold<W> watermarkHold;
+
+  private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
+
+  /**
+   * Store the previously emitted pane (if any) for each window.
+   *
+   * <ul>
+   * <li>State: The previous {@link PaneInfo} passed to the user's {@link 
DoFn#processElement},
+   * if any.
+   * <li>Style style: DIRECT
+   * <li>Merging: Always keyed by actual window, so does not depend on {@link 
#activeWindows}.
+   * Cleared when window is merged away.
+   * <li>Lifetime: Cleared when trigger is closed or window is garbage 
collected.
+   * </ul>
+   */
+  private final PaneInfoTracker paneInfoTracker;
+
+  /**
+   * Store whether we've seen any elements for a window since the last pane 
was emitted.
+   *
+   * <ul>
+   * <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements 
added so far.
+   * <li>State style: RENAMED.
+   * <li>Merging: Counts are summed when windows are merged.
+   * <li>Lifetime: Cleared when pane fires or window is garbage collected.
+   * </ul>
+   */
+  private final NonEmptyPanes<K, W> nonEmptyPanes;
+
+  public ReduceFnRunner(
+      K key,
+      WindowingStrategy<?, W> windowingStrategy,
+      StateInternals<K> stateInternals,
+      TimerInternals timerInternals,
+      WindowingInternals<?, KV<K, OutputT>> windowingInternals,
+      Aggregator<Long, Long> droppedDueToClosedWindow,
+      ReduceFn<K, InputT, OutputT, W> reduceFn,
+      PipelineOptions options) {
+    this.key = key;
+    this.timerInternals = timerInternals;
+    this.paneInfoTracker = new PaneInfoTracker(timerInternals);
+    this.stateInternals = stateInternals;
+    this.outputter = new OutputViaWindowingInternals<>(windowingInternals);
+    this.droppedDueToClosedWindow = droppedDueToClosedWindow;
+    this.reduceFn = reduceFn;
+
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<Object, W> objectWindowingStrategy =
+        (WindowingStrategy<Object, W>) windowingStrategy;
+    this.windowingStrategy = objectWindowingStrategy;
+
+    this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, 
this.reduceFn);
+
+    // Note this may incur I/O to load persisted window set data.
+    this.activeWindows = createActiveWindowSet();
+
+    this.contextFactory =
+        new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, 
this.windowingStrategy,
+            stateInternals, this.activeWindows, timerInternals, 
windowingInternals, options);
+
+    this.watermarkHold = new WatermarkHold<>(timerInternals, 
windowingStrategy);
+    this.triggerRunner =
+        new TriggerRunner<>(
+            windowingStrategy.getTrigger(),
+            new TriggerContextFactory<>(windowingStrategy, stateInternals, 
activeWindows));
+  }
+
+  private ActiveWindowSet<W> createActiveWindowSet() {
+    return windowingStrategy.getWindowFn().isNonMerging()
+        ? new NonMergingActiveWindowSet<W>()
+        : new MergingActiveWindowSet<W>(windowingStrategy.getWindowFn(), 
stateInternals);
+  }
+
+  @VisibleForTesting
+  boolean isFinished(W window) {
+    return triggerRunner.isClosed(contextFactory.base(window, 
StateStyle.DIRECT).state());
+  }
+
+  @VisibleForTesting
+  boolean hasNoActiveWindows() {
+    return activeWindows.getActiveAndNewWindows().isEmpty();
+  }
+
+  /**
+   * Incorporate {@code values} into the underlying reduce function, and 
manage holds, timers,
+   * triggers, and window merging.
+   *
+   * <p>The general strategy is:
+   * <ol>
+   * <li>Use {@link WindowedValue#getWindows} (itself determined using
+   * {@link WindowFn#assignWindows}) to determine which windows each element 
belongs to. Some
+   * of those windows will already have state associated with them. The rest 
are considered
+   * NEW.
+   * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently 
ACTIVE and NEW windows.
+   * Each NEW window will become either ACTIVE or be discardedL.
+   * (See {@link ActiveWindowSet} for definitions of these terms.)
+   * <li>If at all possible, eagerly substitute NEW windows with their ACTIVE 
state address
+   * windows before any state is associated with the NEW window. In the common 
case that
+   * windows for new elements are merged into existing ACTIVE windows then no 
additional
+   * storage or merging overhead will be incurred.
+   * <li>Otherwise, keep track of the state address windows for ACTIVE windows 
so that their
+   * states can be merged on-demand when a pane fires.
+   * <li>Process the element for each of the windows it's windows have been 
merged into according
+   * to {@link ActiveWindowSet}. Processing may require running triggers, 
setting timers,
+   * setting holds, and invoking {@link ReduceFn#onTrigger}.
+   * </ol>
+   */
+  public void processElements(Iterable<WindowedValue<InputT>> values) throws 
Exception {
+    // If an incoming element introduces a new window, attempt to merge it 
into an existing
+    // window eagerly.
+    Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
+
+    Set<W> windowsToConsider = new HashSet<>();
+
+    // Process each element, using the updated activeWindows determined by 
collectAndMergeWindows.
+    for (WindowedValue<InputT> value : values) {
+      windowsToConsider.addAll(processElement(windowToMergeResult, value));
+    }
+
+    // Trigger output from any window for which the trigger is ready
+    for (W mergedWindow : windowsToConsider) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext =
+          contextFactory.base(mergedWindow, StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+          contextFactory.base(mergedWindow, StateStyle.RENAMED);
+      triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
+      emitIfAppropriate(directContext, renamedContext);
+    }
+
+    // We're all done with merging and emitting elements so can compress the 
activeWindow state.
+    // Any windows which are still NEW must have come in on a new element 
which was then discarded
+    // due to the window's trigger being closed. We can thus delete them.
+    activeWindows.cleanupTemporaryWindows();
+  }
+
+  public void persist() {
+    activeWindows.persist();
+  }
+
+  /**
+   * Extract the windows associated with the values, and invoke merge. Return 
a map
+   * from windows to the merge result window. If a window is not in the domain 
of
+   * the result map then it did not get merged into a different window.
+   */
+  private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> 
values)
+      throws Exception {
+    // No-op if no merging can take place
+    if (windowingStrategy.getWindowFn().isNonMerging()) {
+      return ImmutableMap.of();
+    }
+
+    // Collect the windows from all elements (except those which are too late) 
and
+    // make sure they are already in the active window set or are added as NEW 
windows.
+    for (WindowedValue<?> value : values) {
+      for (BoundedWindow untypedWindow : value.getWindows()) {
+        @SuppressWarnings("unchecked")
+        W window = (W) untypedWindow;
+
+        // For backwards compat with pre 1.4 only.
+        // We may still have ACTIVE windows with multiple state addresses, 
representing
+        // a window who's state has not yet been eagerly merged.
+        // We'll go ahead and merge that state now so that we don't have to 
worry about
+        // this legacy case anywhere else.
+        if (activeWindows.isActive(window)) {
+          Set<W> stateAddressWindows = 
activeWindows.readStateAddresses(window);
+          if (stateAddressWindows.size() > 1) {
+            // This is a legacy window who's state has not been eagerly merged.
+            // Do that now.
+            ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
+                contextFactory.forPremerge(window);
+            reduceFn.onMerge(premergeContext);
+            watermarkHold.onMerge(premergeContext);
+            activeWindows.merged(window);
+          }
+        }
+
+        // Add this window as NEW if it is not currently ACTIVE.
+        // If we had already seen this window and closed its trigger, then the
+        // window will not be currently ACTIVE. It will then be added as NEW 
here,
+        // and fall into the merging logic as usual.
+        activeWindows.ensureWindowExists(window);
+      }
+    }
+
+    // Merge all of the active windows and retain a mapping from source 
windows to result windows.
+    Map<W, W> windowToMergeResult = new HashMap<>();
+    activeWindows.merge(new OnMergeCallback(windowToMergeResult));
+    return windowToMergeResult;
+  }
+
+  private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
+    private final Map<W, W> windowToMergeResult;
+
+    OnMergeCallback(Map<W, W> windowToMergeResult) {
+      this.windowToMergeResult = windowToMergeResult;
+    }
+
+    /**
+     * Return the subset of {@code windows} which are currently ACTIVE. We 
only need to worry
+     * about merging state from ACTIVE windows. NEW windows by definition have 
no existing state.
+     */
+    private List<W> activeWindows(Iterable<W> windows) {
+      List<W> active = new ArrayList<>();
+      for (W window : windows) {
+        if (activeWindows.isActive(window)) {
+          active.add(window);
+        }
+      }
+      return active;
+    }
+
+    /**
+     * Called from the active window set to indicate {@code toBeMerged} (of 
which only
+     * {@code activeToBeMerged} are ACTIVE and thus have state associated with 
them) will later
+     * be merged into {@code mergeResult}.
+     */
+    @Override
+    public void prefetchOnMerge(
+        Collection<W> toBeMerged, W mergeResult) throws Exception {
+      List<W> activeToBeMerged = activeWindows(toBeMerged);
+      ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
+          contextFactory.forMerge(activeToBeMerged, mergeResult, 
StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
+          contextFactory.forMerge(activeToBeMerged, mergeResult, 
StateStyle.RENAMED);
+
+      // Prefetch various state.
+      triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, 
directMergeContext.state());
+      reduceFn.prefetchOnMerge(renamedMergeContext.state());
+      watermarkHold.prefetchOnMerge(renamedMergeContext.state());
+      nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
+    }
+
+    /**
+     * Called from the active window set to indicate {@code toBeMerged} (of 
which only
+     * {@code activeToBeMerged} are ACTIVE and thus have state associated with 
them) are about
+     * to be merged into {@code mergeResult}.
+     */
+    @Override
+    public void onMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+      // Remember we have merged these windows.
+      for (W window : toBeMerged) {
+        windowToMergeResult.put(window, mergeResult);
+      }
+
+      // At this point activeWindows has NOT incorporated the results of the 
merge.
+      List<W> activeToBeMerged = activeWindows(toBeMerged);
+      ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
+          contextFactory.forMerge(activeToBeMerged, mergeResult, 
StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
+          contextFactory.forMerge(activeToBeMerged, mergeResult, 
StateStyle.RENAMED);
+
+      // Run the reduceFn to perform any needed merging.
+      reduceFn.onMerge(renamedMergeContext);
+
+      // Merge the watermark holds.
+      watermarkHold.onMerge(renamedMergeContext);
+
+      // Merge non-empty pane state.
+      nonEmptyPanes.onMerge(renamedMergeContext.state());
+
+      // Have the trigger merge state as needed.
+      triggerRunner.onMerge(
+          directMergeContext.window(), directMergeContext.timers(), 
directMergeContext.state());
+
+      for (W active : activeToBeMerged) {
+        if (active.equals(mergeResult)) {
+          // Not merged away.
+          continue;
+        }
+        // Cleanup flavor A: Currently ACTIVE window is about to be merged 
away.
+        // Clear any state not already cleared by the onMerge calls above.
+        WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", 
active, mergeResult);
+        ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
+            contextFactory.base(active, StateStyle.DIRECT);
+        // No need for the end-of-window or garbage collection timers.
+        // We will establish a new end-of-window or garbage collection timer 
for the mergeResult
+        // window in processElement below. There must be at least one element 
for the mergeResult
+        // window since a new element with a new window must have triggered 
this onMerge.
+        cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
+        // We no longer care about any previous panes of merged away windows. 
The
+        // merge result window gets to start fresh if it is new.
+        paneInfoTracker.clear(directClearContext.state());
+      }
+    }
+  }
+
+  /**
+   * Process an element.
+   *
+   * @param value the value being processed
+   * @return the set of windows in which the element was actually processed
+   */
+  private Collection<W> processElement(Map<W, W> windowToMergeResult, 
WindowedValue<InputT> value)
+      throws Exception {
+    // Redirect element windows to the ACTIVE windows they have been merged 
into.
+    // The compressed representation (value, {window1, window2, ...}) actually 
represents
+    // distinct elements (value, window1), (value, window2), ...
+    // so if window1 and window2 merge, the resulting window will contain both 
copies
+    // of the value.
+    Collection<W> windows = new ArrayList<>();
+    for (BoundedWindow untypedWindow : value.getWindows()) {
+      @SuppressWarnings("unchecked")
+      W window = (W) untypedWindow;
+      W mergeResult = windowToMergeResult.get(window);
+      if (mergeResult == null) {
+        mergeResult = window;
+      }
+      windows.add(mergeResult);
+    }
+
+    // Prefetch in each of the windows if we're going to need to process 
triggers
+    for (W window : windows) {
+      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = 
contextFactory.forValue(
+          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+      triggerRunner.prefetchForValue(window, directContext.state());
+    }
+
+    // Process the element for each (mergeResultWindow, not closed) window it 
belongs to.
+    List<W> triggerableWindows = new ArrayList<>(windows.size());
+    for (W window : windows) {
+      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = 
contextFactory.forValue(
+          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+      if (triggerRunner.isClosed(directContext.state())) {
+        // This window has already been closed.
+        droppedDueToClosedWindow.addValue(1L);
+        WindowTracing.debug(
+            "ReduceFnRunner.processElement: Dropping element at {} for key:{}; 
window:{} "
+            + "since window is no longer active at inputWatermark:{}; 
outputWatermark:{}",
+            value.getTimestamp(), key, window, 
timerInternals.currentInputWatermarkTime(),
+            timerInternals.currentOutputWatermarkTime());
+        continue;
+      }
+
+      triggerableWindows.add(window);
+      activeWindows.ensureWindowIsActive(window);
+      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = 
contextFactory.forValue(
+          window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
+
+      nonEmptyPanes.recordContent(renamedContext.state());
+
+      // Make sure we've scheduled the end-of-window or garbage collection 
timer for this window.
+      Instant timer = 
scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
+
+      // Hold back progress of the output watermark until we have processed 
the pane this
+      // element will be included within. If the element is too late for that, 
place a hold at
+      // the end-of-window or garbage collection time to allow empty panes to 
contribute elements
+      // which won't be dropped due to lateness by a following computation 
(assuming the following
+      // computation uses the same allowed lateness value...)
+      @Nullable Instant hold = watermarkHold.addHolds(renamedContext);
+
+      if (hold != null) {
+        // Assert that holds have a proximate timer.
+        boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
+        boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
+        Preconditions.checkState(
+            holdInWindow == timerInWindow,
+            "set a hold at %s, a timer at %s, which disagree as to whether 
they are in window %s",
+            hold,
+            timer,
+            directContext.window());
+      }
+
+      // Execute the reduceFn, which will buffer the value as appropriate
+      reduceFn.processValue(renamedContext);
+
+      // Run the trigger to update its state
+      triggerRunner.processValue(
+          directContext.window(),
+          directContext.timestamp(),
+          directContext.timers(),
+          directContext.state());
+
+      // At this point, if triggerRunner.shouldFire before the processValue 
then
+      // triggerRunner.shouldFire after the processValue. In other words 
adding values
+      // cannot take a trigger state from firing to non-firing.
+      // (We don't actually assert this since it is too slow.)
+    }
+
+    return triggerableWindows;
+  }
+
+  /**
+   * Called when an end-of-window, garbage collection, or trigger-specific 
timer fires.
+   */
+  public void onTimer(TimerData timer) throws Exception {
+    // Which window is the timer for?
+    Preconditions.checkArgument(timer.getNamespace() instanceof 
WindowNamespace,
+        "Expected timer to be in WindowNamespace, but was in %s", 
timer.getNamespace());
+    @SuppressWarnings("unchecked")
+    WindowNamespace<W> windowNamespace = (WindowNamespace<W>) 
timer.getNamespace();
+    W window = windowNamespace.getWindow();
+    ReduceFn<K, InputT, OutputT, W>.Context directContext =
+        contextFactory.base(window, StateStyle.DIRECT);
+    ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+        contextFactory.base(window, StateStyle.RENAMED);
+
+    // Has this window had its trigger finish?
+    // - The trigger may implement isClosed as constant false.
+    // - If the window function does not support windowing then all windows 
will be considered
+    // active.
+    // So we must take conjunction of activeWindows and triggerRunner state.
+    boolean windowIsActiveAndOpen =
+        activeWindows.isActive(window) && 
!triggerRunner.isClosed(directContext.state());
+
+    if (!windowIsActiveAndOpen) {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window 
{}", timer, window);
+    }
+
+    // If this is an end-of-window timer then we may need to set a garbage 
collection timer
+    // if allowed lateness is non-zero.
+    boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+        && timer.getTimestamp().equals(window.maxTimestamp());
+
+    // If this is a garbage collection timer then we should trigger and 
garbage collect the window.
+    // We'll consider any timer at or after the end-of-window time to be a 
signal to garbage
+    // collect.
+    Instant cleanupTime = garbageCollectionTime(window);
+    boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
+        && !timer.getTimestamp().isBefore(cleanupTime);
+
+    if (isGarbageCollection) {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} 
with "
+          + "inputWatermark:{}; outputWatermark:{}",
+          key, window, timer.getTimestamp(), 
timerInternals.currentInputWatermarkTime(),
+          timerInternals.currentOutputWatermarkTime());
+
+      if (windowIsActiveAndOpen) {
+        // We need to call onTrigger to emit the final pane if required.
+        // The final pane *may* be ON_TIME if no prior ON_TIME pane has been 
emitted,
+        // and the watermark has passed the end of the window.
+        @Nullable Instant newHold =
+            onTrigger(directContext, renamedContext, true/* isFinished */, 
isEndOfWindow);
+        Preconditions.checkState(newHold == null,
+            "Hold placed at %s despite isFinished being true.", newHold);
+      }
+
+      // Cleanup flavor B: Clear all the remaining state for this window since 
we'll never
+      // see elements for it again.
+      clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
+    } else {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with 
"
+          + "inputWatermark:{}; outputWatermark:{}",
+          key, window, timer.getTimestamp(), 
timerInternals.currentInputWatermarkTime(),
+          timerInternals.currentOutputWatermarkTime());
+      if (windowIsActiveAndOpen) {
+        emitIfAppropriate(directContext, renamedContext);
+      }
+
+      if (isEndOfWindow) {
+        // If the window strategy trigger includes a watermark trigger then at 
this point
+        // there should be no data holds, either because we'd already cleared 
them on an
+        // earlier onTrigger, or because we just cleared them on the above 
emitIfAppropriate.
+        // We could assert this but it is very expensive.
+
+        // Since we are processing an on-time firing we should schedule the 
garbage collection
+        // timer. (If getAllowedLateness is zero then the timer event will be 
considered a
+        // cleanup event and handled by the above).
+        // Note we must do this even if the trigger is finished so that we are 
sure to cleanup
+        // any final trigger finished bits.
+        Preconditions.checkState(
+            windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+            "Unexpected zero getAllowedLateness");
+        WindowTracing.debug(
+            "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; 
window:{} at {} with "
+            + "inputWatermark:{}; outputWatermark:{}",
+            key, directContext.window(), cleanupTime, 
timerInternals.currentInputWatermarkTime(),
+            timerInternals.currentOutputWatermarkTime());
+        
Preconditions.checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                                 "Cleanup time %s is beyond end-of-time", 
cleanupTime);
+        directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+      }
+    }
+  }
+
+  /**
+   * Clear all the state associated with {@code context}'s window.
+   * Should only be invoked if we know all future elements for this window 
will be considered
+   * beyond allowed lateness.
+   * This is a superset of the clearing done by {@link #emitIfAppropriate} 
below since:
+   * <ol>
+   * <li>We can clear the trigger finished bits since we'll never need to ask 
if the trigger is
+   * closed again.
+   * <li>We can clear any remaining garbage collection hold.
+   * </ol>
+   */
+  private void clearAllState(
+      ReduceFn<K, InputT, OutputT, W>.Context directContext,
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
+      boolean windowIsActiveAndOpen)
+      throws Exception {
+    if (windowIsActiveAndOpen) {
+      // Since both the window is in the active window set AND the trigger was 
not yet closed,
+      // it is possible we still have state.
+      reduceFn.clearState(renamedContext);
+      watermarkHold.clearHolds(renamedContext);
+      nonEmptyPanes.clearPane(renamedContext.state());
+      // These calls work irrespective of whether the window is active or not, 
but
+      // are unnecessary if the window is not active.
+      triggerRunner.clearState(
+          directContext.window(), directContext.timers(), 
directContext.state());
+      paneInfoTracker.clear(directContext.state());
+    } else {
+      // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or 
triggerRunner.isClosed (2).
+      // For (1), if !activeWindows.isActive then the window must be merging 
and has been
+      // explicitly removed by emitIfAppropriate. But in that case the trigger 
must have fired
+      // and been closed, so this case reduces to (2).
+      // For (2), if triggerRunner.isClosed then the trigger was fired and 
entered the
+      // closed state. In that case emitIfAppropriate will have cleared all 
state in
+      // reduceFn, triggerRunner (except for finished bits), paneInfoTracker 
and activeWindows.
+      // We also know nonEmptyPanes must have been unconditionally cleared by 
the trigger.
+      // Since the trigger fired the existing watermark holds must have been 
cleared, and since
+      // the trigger closed no new end of window or garbage collection hold 
will have been
+      // placed by WatermarkHold.extractAndRelease.
+      // Thus all the state clearing above is unnecessary.
+      //
+      // But(!) for backwards compatibility we must allow a pipeline to be 
updated from
+      // an sdk version <= 1.3. In that case it is possible we have an 
end-of-window or
+      // garbage collection hold keyed by the current window (reached via 
directContext) rather
+      // than the state address window (reached via renamedContext).
+      // However this can only happen if:
+      // - We have merging windows.
+      // - We are DISCARDING_FIRED_PANES.
+      // - A pane has fired.
+      // - But the trigger is not (yet) closed.
+      if (windowingStrategy.getMode() == 
AccumulationMode.DISCARDING_FIRED_PANES
+          && !windowingStrategy.getWindowFn().isNonMerging()) {
+        watermarkHold.clearHolds(directContext);
+      }
+    }
+
+    // Don't need to track address state windows anymore.
+    activeWindows.remove(directContext.window());
+    // We'll never need to test for the trigger being closed again.
+    triggerRunner.clearFinished(directContext.state());
+  }
+
+  /** Should the reduce function state be cleared? */
+  private boolean shouldDiscardAfterFiring(boolean isFinished) {
+    if (isFinished) {
+      // This is the last firing for trigger.
+      return true;
+    }
+    if (windowingStrategy.getMode() == 
AccumulationMode.DISCARDING_FIRED_PANES) {
+      // Nothing should be accumulated between panes.
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Possibly emit a pane if a trigger is ready to fire or timers require it, 
and cleanup state.
+   */
+  private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context 
directContext,
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
+      throws Exception {
+    if (!triggerRunner.shouldFire(
+        directContext.window(), directContext.timers(), 
directContext.state())) {
+      // Ignore unless trigger is ready to fire
+      return;
+    }
+
+    // Inform the trigger of the transition to see if it is finished
+    triggerRunner.onFire(directContext.window(), directContext.timers(), 
directContext.state());
+    boolean isFinished = triggerRunner.isClosed(directContext.state());
+
+    // Will be able to clear all element state after triggering?
+    boolean shouldDiscard = shouldDiscardAfterFiring(isFinished);
+
+    // Run onTrigger to produce the actual pane contents.
+    // As a side effect it will clear all element holds, but not necessarily 
any
+    // end-of-window or garbage collection holds.
+    onTrigger(directContext, renamedContext, isFinished, false 
/*isEndOfWindow*/);
+
+    // Now that we've triggered, the pane is empty.
+    nonEmptyPanes.clearPane(renamedContext.state());
+
+    // Cleanup buffered data if appropriate
+    if (shouldDiscard) {
+      // Cleanup flavor C: The user does not want any buffered data to persist 
between panes.
+      reduceFn.clearState(renamedContext);
+    }
+
+    if (isFinished) {
+      // Cleanup flavor D: If trigger is closed we will ignore all new 
incoming elements.
+      // Clear state not otherwise cleared by onTrigger and clearPane above.
+      // Remember the trigger is, indeed, closed until the window is garbage 
collected.
+      triggerRunner.clearState(
+          directContext.window(), directContext.timers(), 
directContext.state());
+      paneInfoTracker.clear(directContext.state());
+      activeWindows.remove(directContext.window());
+    }
+  }
+
+  /**
+   * Do we need to emit a pane?
+   */
+  private boolean needToEmit(boolean isEmpty, boolean isFinished, 
PaneInfo.Timing timing) {
+    if (!isEmpty) {
+      // The pane has elements.
+      return true;
+    }
+    if (timing == Timing.ON_TIME) {
+      // This is the unique ON_TIME pane.
+      return true;
+    }
+    if (isFinished && windowingStrategy.getClosingBehavior() == 
ClosingBehavior.FIRE_ALWAYS) {
+      // This is known to be the final pane, and the user has requested it 
even when empty.
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Run the {@link ReduceFn#onTrigger} method and produce any necessary 
output.
+   *
+   * @return output watermark hold added, or {@literal null} if none.
+   */
+  @Nullable
+  private Instant onTrigger(
+      final ReduceFn<K, InputT, OutputT, W>.Context directContext,
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
+      boolean isFinished, boolean isEndOfWindow)
+          throws Exception {
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+    // Prefetch necessary states
+    ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
+        watermarkHold.extractAndRelease(renamedContext, 
isFinished).readLater();
+    ReadableState<PaneInfo> paneFuture =
+        paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
+    ReadableState<Boolean> isEmptyFuture =
+        nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
+
+    reduceFn.prefetchOnTrigger(directContext.state());
+    triggerRunner.prefetchOnFire(directContext.window(), 
directContext.state());
+
+    // Calculate the pane info.
+    final PaneInfo pane = paneFuture.read();
+    // Extract the window hold, and as a side effect clear it.
+
+    WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+    final Instant outputTimestamp = pair.oldHold;
+    @Nullable Instant newHold = pair.newHold;
+
+    if (newHold != null) {
+      // We can't be finished yet.
+      Preconditions.checkState(
+        !isFinished, "new hold at %s but finished %s", newHold, 
directContext.window());
+      // The hold cannot be behind the input watermark.
+      Preconditions.checkState(
+        !newHold.isBefore(inputWM), "new hold %s is before input watermark 
%s", newHold, inputWM);
+      if (newHold.isAfter(directContext.window().maxTimestamp())) {
+        // The hold must be for garbage collection, which can't have happened 
yet.
+        Preconditions.checkState(
+          newHold.isEqual(garbageCollectionTime(directContext.window())),
+          "new hold %s should be at garbage collection for window %s plus %s",
+          newHold,
+          directContext.window(),
+          windowingStrategy.getAllowedLateness());
+      } else {
+        // The hold must be for the end-of-window, which can't have happened 
yet.
+        Preconditions.checkState(
+          newHold.isEqual(directContext.window().maxTimestamp()),
+          "new hold %s should be at end of window %s",
+          newHold,
+          directContext.window());
+        Preconditions.checkState(
+          !isEndOfWindow,
+          "new hold at %s for %s but this is the watermark trigger",
+          newHold,
+          directContext.window());
+      }
+    }
+
+    // Only emit a pane if it has data or empty panes are observable.
+    if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
+      // Run reduceFn.onTrigger method.
+      final List<W> windows = 
Collections.singletonList(directContext.window());
+      ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
+          contextFactory.forTrigger(directContext.window(), paneFuture, 
StateStyle.RENAMED,
+              new OnTriggerCallbacks<OutputT>() {
+                @Override
+                public void output(OutputT toOutput) {
+                  // We're going to output panes, so commit the (now used) 
PaneInfo.
+                  // TODO: This is unnecessary if the trigger isFinished since 
the saved
+                  // state will be immediately deleted.
+                  paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
+
+                  // Output the actual value.
+                  outputter.outputWindowedValue(
+                      KV.of(key, toOutput), outputTimestamp, windows, pane);
+                }
+              });
+
+      reduceFn.onTrigger(renamedTriggerContext);
+    }
+
+    return newHold;
+  }
+
+  /**
+   * Make sure we'll eventually have a timer fire which will tell us to 
garbage collect
+   * the window state. For efficiency we may need to do this in two steps 
rather
+   * than one. Return the time at which the timer will fire.
+   *
+   * <ul>
+   * <li>If allowedLateness is zero then we'll garbage collect at the end of 
the window.
+   * For simplicity we'll set our own timer for this situation even though an
+   * {@link AfterWatermark} trigger may have also set an end-of-window timer.
+   * ({@code setTimer} is idempotent.)
+   * <li>If allowedLateness is non-zero then we could just always set a timer 
for the garbage
+   * collection time. However if the windows are large (eg hourly) and the 
allowedLateness is small
+   * (eg seconds) then we'll end up with nearly twice the number of timers 
in-flight. So we
+   * instead set an end-of-window timer and then roll that forward to a 
garbage collection timer
+   * when it fires. We use the input watermark to distinguish those cases.
+   * </ul>
+   */
+  private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
+      ReduceFn<?, ?, ?, W>.Context directContext) {
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Instant endOfWindow = directContext.window().maxTimestamp();
+    String which;
+    Instant timer;
+    if (endOfWindow.isBefore(inputWM)) {
+      timer = garbageCollectionTime(directContext.window());
+      which = "garbage collection";
+    } else {
+      timer = endOfWindow;
+      which = "end-of-window";
+    }
+    WindowTracing.trace(
+        "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: 
Scheduling {} timer at {} for "
+        + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
+        which,
+        timer,
+        key,
+        directContext.window(),
+        inputWM,
+        timerInternals.currentOutputWatermarkTime());
+    Preconditions.checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                             "Timer %s is beyond end-of-time", timer);
+    directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
+    return timer;
+  }
+
+  private void cancelEndOfWindowAndGarbageCollectionTimers(
+      ReduceFn<?, ?, ?, W>.Context directContext) {
+    WindowTracing.debug(
+        "ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting 
timers for "
+        + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
+        key, directContext.window(), 
timerInternals.currentInputWatermarkTime(),
+        timerInternals.currentOutputWatermarkTime());
+    Instant eow = directContext.window().maxTimestamp();
+    directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
+    Instant gc = garbageCollectionTime(directContext.window());
+    if (gc.isAfter(eow)) {
+      directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
+    }
+  }
+
+  /**
+   * Return when {@code window} should be garbage collected. If the window's 
expiration time is on
+   * or after the end of the global window, it will be truncated to the end of 
the global window.
+   */
+  private Instant garbageCollectionTime(W window) {
+
+    // If the end of the window + allowed lateness is beyond the "end of time" 
aka the end of the
+    // global window, then we truncate it. The conditional is phrased like it 
is because the
+    // addition of EOW + allowed lateness might even overflow the maximum 
allowed Instant
+    if (GlobalWindow.INSTANCE
+        .maxTimestamp()
+        .minus(windowingStrategy.getAllowedLateness())
+        .isBefore(window.maxTimestamp())) {
+      return GlobalWindow.INSTANCE.maxTimestamp();
+    } else {
+      return 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+    }
+  }
+
+  /**
+   * An object that can output a value with all of its windowing information. 
This is a deliberately
+   * restricted subinterface of {@link WindowingInternals} to express how it 
is used here.
+   */
+  private interface OutputWindowedValue<OutputT> {
+    void outputWindowedValue(OutputT output, Instant timestamp,
+        Collection<? extends BoundedWindow> windows, PaneInfo pane);
+  }
+
+  private static class OutputViaWindowingInternals<OutputT>
+      implements OutputWindowedValue<OutputT> {
+
+    private final WindowingInternals<?, OutputT> windowingInternals;
+
+    public OutputViaWindowingInternals(WindowingInternals<?, OutputT> 
windowingInternals) {
+      this.windowingInternals = windowingInternals;
+    }
+
+    @Override
+    public void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
new file mode 100644
index 0000000..78377c8
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.List;
+
+/**
+ * Runs a {@link DoFn} by constructing the appropriate contexts and passing 
them in.
+ *
+ * @param <InputT> the type of the DoFn's (main) input elements
+ * @param <OutputT> the type of the DoFn's (main) output elements
+ */
+public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, 
OutputT>{
+
+  protected SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, 
StepContext stepContext,
+      AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> 
windowingStrategy) {
+    super(options, fn, sideInputReader, outputManager, mainOutputTag, 
sideOutputTags, stepContext,
+        addCounterMutator, windowingStrategy);
+  }
+
+  @Override
+  protected void invokeProcessElement(WindowedValue<InputT> elem) {
+    final DoFn<InputT, OutputT>.ProcessContext processContext = 
createProcessContext(elem);
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.processElement(processContext);
+    } catch (Exception ex) {
+      throw wrapUserCodeException(ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
new file mode 100644
index 0000000..2eeee54
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+
+/**
+ * {@link ReduceFn} implementing the default reduction behaviors of {@link 
GroupByKey}.
+ *
+ * @param <K> The type of key being processed.
+ * @param <InputT> The type of values associated with the key.
+ * @param <OutputT> The output type that will be produced for each key.
+ * @param <W> The type of windows this operates on.
+ */
+public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends 
BoundedWindow>
+    extends ReduceFn<K, InputT, OutputT, W> {
+  private static final String BUFFER_NAME = "buf";
+
+  /**
+   * Create a factory that produces {@link SystemReduceFn} instances that that 
buffer all of the
+   * input values in persistent state and produces an {@code Iterable<T>}.
+   */
+  public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, 
Iterable<T>, Iterable<T>, W>
+      buffering(final Coder<T> inputCoder) {
+    final StateTag<Object, BagState<T>> bufferTag =
+        StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, 
inputCoder));
+    return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
+      @Override
+      public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws 
Exception {
+        StateMerging.prefetchBags(state, bufferTag);
+      }
+
+      @Override
+      public void onMerge(OnMergeContext c) throws Exception {
+        StateMerging.mergeBags(c.state(), bufferTag);
+      }
+    };
+  }
+
+  /**
+   * Create a factory that produces {@link SystemReduceFn} instances that 
combine all of the input
+   * values using a {@link CombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> 
SystemReduceFn<K, InputT,
+      AccumT, OutputT, W>
+      combining(
+          final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, 
OutputT> combineFn) {
+    final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> 
bufferTag;
+    if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
+      bufferTag = StateTags.makeSystemTagInternal(
+          StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
+              BUFFER_NAME, combineFn.getAccumulatorCoder(),
+              (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) 
combineFn.getFn()));
+
+    } else {
+      bufferTag = StateTags.makeSystemTagInternal(
+            StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
+                BUFFER_NAME, combineFn.getAccumulatorCoder(),
+                (KeyedCombineFn<K, InputT, AccumT, OutputT>) 
combineFn.getFn()));
+    }
+    return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
+      @Override
+      public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws 
Exception {
+        StateMerging.prefetchCombiningValues(state, bufferTag);
+      }
+
+      @Override
+      public void onMerge(OnMergeContext c) throws Exception {
+        StateMerging.mergeCombiningValues(c.state(), bufferTag);
+      }
+    };
+  }
+
+  private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> 
bufferTag;
+
+  public SystemReduceFn(
+      StateTag<? super K, ? extends CombiningState<InputT, OutputT>> 
bufferTag) {
+    this.bufferTag = bufferTag;
+  }
+
+  @Override
+  public void processValue(ProcessValueContext c) throws Exception {
+    c.state().access(bufferTag).add(c.value());
+  }
+
+  @Override
+  public void prefetchOnTrigger(StateAccessor<K> state) {
+    state.access(bufferTag).readLater();
+  }
+
+  @Override
+  public void onTrigger(OnTriggerContext c) throws Exception {
+    c.output(c.state().access(bufferTag).read());
+  }
+
+  @Override
+  public void clearState(Context c) throws Exception {
+    c.state().access(bufferTag).clear();
+  }
+
+  @Override
+  public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+    return state.access(bufferTag).isEmpty();
+  }
+}

Reply via email to