Moves OldDoFn to runners-core

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f8b8c5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f8b8c5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f8b8c5b

Branch: refs/heads/master
Commit: 5f8b8c5b06cfd49c4293a20dff2eea99f1076444
Parents: 77c7505
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Tue Jan 17 16:12:39 2017 -0800
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Fri Jan 20 13:31:58 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/WindowBoundTranslator.java |   2 +-
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../operators/ApexParDoOperator.java            |   2 +-
 .../beam/runners/core/AssignWindowsDoFn.java    |   3 +-
 .../apache/beam/runners/core/DoFnAdapters.java  |   1 -
 .../apache/beam/runners/core/DoFnRunner.java    |   1 -
 .../apache/beam/runners/core/DoFnRunners.java   |   1 -
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   1 -
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   1 -
 .../core/LateDataDroppingDoFnRunner.java        |   1 -
 .../org/apache/beam/runners/core/OldDoFn.java   | 472 +++++++++++++++++++
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   3 +-
 .../core/DoFnDelegatingAggregatorTest.java      | 144 ++++++
 .../apache/beam/runners/core/NoOpOldDoFn.java   |  72 +++
 .../beam/runners/core/OldDoFnContextTest.java   |  72 +++
 .../apache/beam/runners/core/OldDoFnTest.java   | 192 ++++++++
 .../runners/core/SimpleOldDoFnRunnerTest.java   |   2 +-
 .../runners/flink/OldPerKeyCombineFnRunner.java |   2 +-
 .../flink/OldPerKeyCombineFnRunners.java        |   2 +-
 .../functions/FlinkDoFnFunction.java            |   2 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |   2 +-
 .../FlinkMultiOutputProcessContext.java         |   2 +-
 .../functions/FlinkNoElementAssignContext.java  |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../FlinkSingleOutputProcessContext.java        |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    |   4 +-
 .../sdk/transforms/DelegatingAggregator.java    |   2 +-
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 465 ------------------
 .../org/apache/beam/sdk/util/NameUtils.java     |   2 +-
 .../DoFnDelegatingAggregatorTest.java           | 142 ------
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java |  71 ---
 .../beam/sdk/transforms/OldDoFnContextTest.java |  69 ---
 .../apache/beam/sdk/transforms/OldDoFnTest.java | 187 --------
 .../org/apache/beam/sdk/util/NameUtilsTest.java |  20 +-
 39 files changed, 982 insertions(+), 978 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
index ef049e1..50af81d 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -23,8 +23,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
 import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 173434f..4c2b461 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -44,6 +44,7 @@ import 
org.apache.beam.runners.apex.translation.utils.SerializablePipelineOption
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -52,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index de4c15d..808001e 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -43,6 +43,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.sdk.coders.Coder;
@@ -50,7 +51,6 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
index 0eb1667..bbf3574 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
@@ -21,8 +21,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
 import java.util.Collection;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index 0f5624f..23aba58 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 7c73a34..66f95db 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.core;
 
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 2f3e93c..f3972ae 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -22,7 +22,6 @@ import 
org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index d79683a..ecce4fc 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -21,7 +21,6 @@ import 
org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 9a2f8fd..7e96136 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.core;
 
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 290171a..9436ccf 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -22,7 +22,6 @@ import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
new file mode 100644
index 0000000..b099721
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DelegatingAggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * The argument to {@link ParDo} providing the code to use to process
+ * elements of the input
+ * {@link org.apache.beam.sdk.values.PCollection}.
+ *
+ * <p>See {@link ParDo} for more explanation, examples of use, and
+ * discussion of constraints on {@code OldDoFn}s, including their
+ * serializability, lack of access to global shared mutable state,
+ * requirements for failure tolerance, and benefits of optimization.
+ *
+ * <p>{@code OldDoFn}s can be tested in the context of a particular
+ * {@code Pipeline} by running that {@code Pipeline} on sample input
+ * and then checking its output.  Unit testing of a {@code OldDoFn},
+ * separately from any {@code ParDo} transform or {@code Pipeline},
+ * can be done via the {@link DoFnTester} harness.
+ *
+ * <p>{@link DoFn} (currently experimental) offers an alternative
+ * mechanism for accessing {@link ProcessContext#window()} without the need
+ * to implement {@link RequiresWindowAccess}.
+ *
+ * <p>See also {@link #processElement} for details on implementing the 
transformation
+ * from {@code InputT} to {@code OutputT}.
+ *
+ * @param <InputT> the type of the (main) input elements
+ * @param <OutputT> the type of the (main) output elements
+ * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link 
DoFn}.
+ */
+@Deprecated
+public abstract class OldDoFn<InputT, OutputT> implements Serializable, 
HasDisplayData {
+  /**
+   * Information accessible to all methods in this {@code OldDoFn}.
+   * Used primarily to output elements.
+   */
+  public abstract class Context {
+
+    /**
+     * Returns the {@code PipelineOptions} specified with the
+     * {@link org.apache.beam.sdk.runners.PipelineRunner}
+     * invoking this {@code OldDoFn}.  The {@code PipelineOptions} will
+     * be the default running via {@link DoFnTester}.
+     */
+    public abstract PipelineOptions getPipelineOptions();
+
+    /**
+     * Adds the given element to the main output {@code PCollection}.
+     *
+     * <p>Once passed to {@code output} the element should be considered
+     * immutable and not be modified in any way. It may be cached or retained
+     * by a Beam runner or later steps in the pipeline, or used in
+     * other unspecified ways.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the 
output
+     * element will have the same timestamp and be in the same windows
+     * as the input element passed to {@link OldDoFn#processElement 
processElement}.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link 
#finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element. The output element
+     * will have a timestamp of negative infinity.
+     */
+    public abstract void output(OutputT output);
+
+    /**
+     * Adds the given element to the main output {@code PCollection},
+     * with the given timestamp.
+     *
+     * <p>Once passed to {@code outputWithTimestamp} the element should not be
+     * modified in any way.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the 
timestamp
+     * must not be older than the input element's timestamp minus
+     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The 
output element will
+     * be in the same windows as the input element.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link 
#finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element except for the
+     * timestamp.
+     */
+    public abstract void outputWithTimestamp(OutputT output, Instant 
timestamp);
+
+    /**
+     * Adds the given element to the side output {@code PCollection} with the
+     * given tag.
+     *
+     * <p>Once passed to {@code sideOutput} the element should not be modified
+     * in any way.
+     *
+     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags 
withOutputTags} to
+     * specify the tags of side outputs that it consumes. Non-consumed side
+     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
+     * need to be specified.
+     *
+     * <p>The output element will have the same timestamp and be in the same
+     * windows as the input element passed to {@link OldDoFn#processElement 
processElement}.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link 
#finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element. The output element
+     * will have a timestamp of negative infinity.
+     *
+     * @see ParDo#withOutputTags
+     */
+    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+
+    /**
+     * Adds the given element to the specified side output {@code PCollection},
+     * with the given timestamp.
+     *
+     * <p>Once passed to {@code sideOutputWithTimestamp} the element should 
not be
+     * modified in any way.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the 
timestamp
+     * must not be older than the input element's timestamp minus
+     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The 
output element will
+     * be in the same windows as the input element.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link 
#finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element except for the
+     * timestamp.
+     *
+     * @see ParDo#withOutputTags
+     */
+    public abstract <T> void sideOutputWithTimestamp(
+        TupleTag<T> tag, T output, Instant timestamp);
+
+    /**
+     * Creates an {@link Aggregator} in the {@link OldDoFn} context with the
+     * specified name and aggregation logic specified by {@link CombineFn}.
+     *
+     * <p>For internal use only.
+     *
+     * @param name the name of the aggregator
+     * @param combiner the {@link CombineFn} to use in the aggregator
+     * @return an aggregator for the provided name and {@link CombineFn} in 
this
+     *         context
+     */
+    @Experimental(Kind.AGGREGATOR)
+    public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+        createAggregatorInternal(String name, CombineFn<AggInputT, ?, 
AggOutputT> combiner);
+
+    /**
+     * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are
+     * usable within this context.
+     *
+     * <p>This method should be called by runners before {@link 
OldDoFn#startBundle}
+     * is executed.
+     */
+    @Experimental(Kind.AGGREGATOR)
+    protected final void setupDelegateAggregators() {
+      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
+        setupDelegateAggregator(aggregator);
+      }
+
+      aggregatorsAreFinal = true;
+    }
+
+    private <AggInputT, AggOutputT> void setupDelegateAggregator(
+        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
+
+      Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
+          aggregator.getName(), aggregator.getCombineFn());
+
+      aggregator.setDelegate(delegate);
+    }
+  }
+
+  /**
+   * Information accessible when running {@link OldDoFn#processElement}.
+   */
+  public abstract class ProcessContext extends Context {
+
+    /**
+     * Returns the input element to be processed.
+     *
+     * <p>The element should be considered immutable. A Beam runner will not 
mutate the
+     * element, so it is safe to cache, etc. The element should not be mutated 
by any of the
+     * {@link OldDoFn} methods, because it may be cached elsewhere, retained 
by the runner
+     * runtime, or used in other unspecified ways.
+     */
+    public abstract InputT element();
+
+    /**
+     * Returns the value of the side input for the window corresponding to the
+     * window of the main input element.
+     *
+     * <p>See
+     * {@link 
org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
+     * for how this corresponding window is determined.
+     *
+     * @throws IllegalArgumentException if this is not a side input
+     * @see ParDo#withSideInputs
+     */
+    public abstract <T> T sideInput(PCollectionView<T> view);
+
+    /**
+     * Returns the timestamp of the input element.
+     *
+     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     */
+    public abstract Instant timestamp();
+
+    /**
+     * Returns the window into which the input element has been assigned.
+     *
+     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     *
+     * @throws UnsupportedOperationException if this {@link OldDoFn} does
+     * not implement {@link RequiresWindowAccess}.
+     */
+    public abstract BoundedWindow window();
+
+    /**
+     * Returns information about the pane within this window into which the
+     * input element has been assigned.
+     *
+     * <p>Generally all data is in a single, uninteresting pane unless custom
+     * triggering and/or late data has been explicitly requested.
+     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     */
+    public abstract PaneInfo pane();
+
+    /**
+     * Returns the process context to use for implementing windowing.
+     */
+    @Experimental
+    public abstract WindowingInternals<InputT, OutputT> windowingInternals();
+  }
+
+  /**
+   * Returns the allowed timestamp skew duration, which is the maximum
+   * duration that timestamps can be shifted backward in
+   * {@link OldDoFn.Context#outputWithTimestamp}.
+   *
+   * <p>The default value is {@code Duration.ZERO}, in which case
+   * timestamps can only be shifted forward to future.  For infinite
+   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
+   *
+   * <p>Note that producing an element whose timestamp is less than the
+   * current timestamp may result in late data, i.e. returning a non-zero
+   * value here does not impact watermark calculations used for firing
+   * windows.
+   *
+   * @deprecated does not interact well with the watermark.
+   */
+  @Deprecated
+  public Duration getAllowedTimestampSkew() {
+    return Duration.ZERO;
+  }
+
+  /**
+   * Interface for signaling that a {@link OldDoFn} needs to access the window 
the
+   * element is being processed in, via {@link OldDoFn.ProcessContext#window}.
+   */
+  @Experimental
+  public interface RequiresWindowAccess {}
+
+  public OldDoFn() {
+    this(new HashMap<String, DelegatingAggregator<?, ?>>());
+  }
+
+  public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
+    this.aggregators = aggregators;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private final Map<String, DelegatingAggregator<?, ?>> aggregators;
+
+  /**
+   * Protects aggregators from being created after initialization.
+   */
+  private boolean aggregatorsAreFinal;
+
+  /**
+   * Prepares this {@link DoFn} instance for processing bundles.
+   *
+   * <p>{@link #setup()} will be called at most once per {@link DoFn} 
instance, and before any other
+   * {@link DoFn} method is called.
+   *
+   * <p>By default, does nothing.
+   */
+  public void setup() throws Exception {
+  }
+
+  /**
+   * Prepares this {@code OldDoFn} instance for processing a batch of elements.
+   *
+   * <p>By default, does nothing.
+   */
+  public void startBundle(Context c) throws Exception {
+  }
+
+  /**
+   * Processes one input element.
+   *
+   * <p>The current element of the input {@code PCollection} is returned by
+   * {@link ProcessContext#element() c.element()}. It should be considered 
immutable. The Beam
+   * runner will not mutate the element, so it is safe to cache, etc. The 
element should not be
+   * mutated by any of the {@link OldDoFn} methods, because it may be cached 
elsewhere, retained by
+   * the Beam runner, or used in other unspecified ways.
+   *
+   * <p>A value is added to the main output {@code PCollection} by {@link 
ProcessContext#output}.
+   * Once passed to {@code output} the element should be considered immutable 
and not be modified in
+   * any way. It may be cached elsewhere, retained by the Beam runner, or used 
in other
+   * unspecified ways.
+   *
+   * @see ProcessContext
+   */
+  public abstract void processElement(ProcessContext c) throws Exception;
+
+  /**
+   * Finishes processing this batch of elements.
+   *
+   * <p>By default, does nothing.
+   */
+  public void finishBundle(Context c) throws Exception {
+  }
+
+  /**
+   * Cleans up this {@link DoFn}.
+   *
+   * <p>{@link #teardown()} will be called before the {@link PipelineRunner} 
discards a {@link DoFn}
+   * instance, including due to another {@link DoFn} method throwing an {@link 
Exception}. No other
+   * {@link DoFn} methods will be called after a call to {@link #teardown()}.
+   *
+   * <p>By default, does nothing.
+   */
+  public void teardown() throws Exception {
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>By default, does not register any display data. Implementors may 
override this method
+   * to provide their own display data.
+   */
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns an {@link Aggregator} with aggregation logic specified by the
+   * {@link CombineFn} argument. The name provided must be unique across
+   * {@link Aggregator}s created within the OldDoFn. Aggregators can only be 
created
+   * during pipeline construction.
+   *
+   * @param name the name of the aggregator
+   * @param combiner the {@link CombineFn} to use in the aggregator
+   * @return an aggregator for the provided name and combiner in the scope of
+   *         this OldDoFn
+   * @throws NullPointerException if the name or combiner is null
+   * @throws IllegalArgumentException if the given name collides with another
+   *         aggregator in this scope
+   * @throws IllegalStateException if called during pipeline processing.
+   */
+  protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+      createAggregator(String name, CombineFn<? super AggInputT, ?, 
AggOutputT> combiner) {
+    checkNotNull(name, "name cannot be null");
+    checkNotNull(combiner, "combiner cannot be null");
+    checkArgument(!aggregators.containsKey(name),
+        "Cannot create aggregator with name %s."
+        + " An Aggregator with that name already exists within this scope.",
+        name);
+
+    checkState(!aggregatorsAreFinal, "Cannot create an aggregator during 
OldDoFn processing."
+        + " Aggregators should be registered during pipeline construction.");
+
+    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
+        new DelegatingAggregator<>(name, combiner);
+    aggregators.put(name, aggregator);
+    return aggregator;
+  }
+
+  /**
+   * Returns an {@link Aggregator} with the aggregation logic specified by the
+   * {@link SerializableFunction} argument. The name provided must be unique
+   * across {@link Aggregator}s created within the OldDoFn. Aggregators can 
only be
+   * created during pipeline construction.
+   *
+   * @param name the name of the aggregator
+   * @param combiner the {@link SerializableFunction} to use in the aggregator
+   * @return an aggregator for the provided name and combiner in the scope of
+   *         this OldDoFn
+   * @throws NullPointerException if the name or combiner is null
+   * @throws IllegalArgumentException if the given name collides with another
+   *         aggregator in this scope
+   * @throws IllegalStateException if called during pipeline processing.
+   */
+  protected final <AggInputT> Aggregator<AggInputT, AggInputT> 
createAggregator(String name,
+      SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
+    checkNotNull(combiner, "combiner cannot be null.");
+    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
+  }
+
+  /**
+   * Returns the {@link Aggregator Aggregators} created by this {@code 
OldDoFn}.
+   */
+  Collection<Aggregator<?, ?>> getAggregators() {
+    return Collections.<Aggregator<?, 
?>>unmodifiableCollection(aggregators.values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 9808e56..2fe9226 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -27,11 +27,10 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
new file mode 100644
index 0000000..b44e8a4
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DelegatingAggregator;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link DelegatingAggregator}.
+ */
+@RunWith(JUnit4.class)
+public class DoFnDelegatingAggregatorTest {
+
+  @Mock
+  private Aggregator<Long, Long> delegate;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testAddValueWithoutDelegateThrowsException() {
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    String name = "agg";
+    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+    DelegatingAggregator<Double, Double> aggregator =
+        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, 
combiner);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("cannot be called");
+    thrown.expectMessage("DoFn");
+
+    aggregator.addValue(21.2);
+  }
+
+  @Test
+  public void testSetDelegateThenAddValueCallsDelegate() {
+    String name = "agg";
+    CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    DelegatingAggregator<Long, Long> aggregator =
+        (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, 
combiner);
+
+    aggregator.setDelegate(delegate);
+
+    aggregator.addValue(12L);
+
+    verify(delegate).addValue(12L);
+  }
+
+  @Test
+  public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
+    String name = "agg";
+    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    DelegatingAggregator<Double, Double> aggregator =
+        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, 
combiner);
+
+    @SuppressWarnings("unchecked")
+    Aggregator<Double, Double> secondDelegate =
+        mock(Aggregator.class, "secondDelegate");
+
+    aggregator.setDelegate(aggregator);
+    aggregator.setDelegate(secondDelegate);
+
+    aggregator.addValue(2.25);
+
+    verify(secondDelegate).addValue(2.25);
+    verify(delegate, never()).addValue(anyLong());
+  }
+
+  @Test
+  public void testGetNameReturnsName() {
+    String name = "agg";
+    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    DelegatingAggregator<Double, Double> aggregator =
+        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, 
combiner);
+
+    assertEquals(name, aggregator.getName());
+  }
+
+  @Test
+  public void testGetCombineFnReturnsCombineFn() {
+    String name = "agg";
+    CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    DelegatingAggregator<Double, Double> aggregator =
+        (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, 
combiner);
+
+    assertEquals(combiner, aggregator.getCombineFn());
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> CombineFn<T, ?, T> mockCombineFn(
+      @SuppressWarnings("unused") Class<T> clazz) {
+    return mock(CombineFn.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
new file mode 100644
index 0000000..5cbea8c
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A {@link OldDoFn} that does nothing with provided elements. Used for testing
+ * methods provided by the {@link OldDoFn} abstract class.
+ *
+ * @param <InputT> unused.
+ * @param <OutputT> unused.
+ */
+class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+  @Override
+  public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws 
Exception {
+  }
+
+  /**
+   * Returns a new NoOp Context.
+   */
+  public OldDoFn<InputT, OutputT>.Context context() {
+    return new NoOpDoFnContext();
+  }
+
+  /**
+   * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
+   */
+  private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return null;
+    }
+    @Override
+    public void output(OutputT output) {
+    }
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+    }
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+    }
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output,
+        Instant timestamp) {
+    }
+    @Override
+    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+        createAggregatorInternal(String name, CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
new file mode 100644
index 0000000..a1cd49d
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Sum;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link OldDoFn.Context}.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnContextTest {
+
+  @Mock
+  private Aggregator<Long, Long> agg;
+
+  private OldDoFn<Object, Object> fn;
+  private OldDoFn<Object, Object>.Context context;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    // Need to be real objects to call the constructor, and to reference the
+    // outer instance of OldDoFn
+    NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
+    OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
+
+    fn = spy(noOpFn);
+    context = spy(noOpContext);
+  }
+
+  @Test
+  public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() 
{
+    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
+    Aggregator<Long, Long> delegateAggregator =
+        fn.createAggregator("test", combiner);
+
+    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
+
+    context.setupDelegateAggregators();
+    delegateAggregator.addValue(1L);
+
+    verify(agg).addValue(1L);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
new file mode 100644
index 0000000..651bc72
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for OldDoFn.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnTest implements Serializable {
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testCreateAggregatorWithCombinerSucceeds() {
+    String name = "testAggregator";
+    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
+
+    assertEquals(name, aggregator.getName());
+    assertEquals(combiner, aggregator.getCombineFn());
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullNameThrowsException() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("name cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    doFn.createAggregator(null, Sum.ofLongs());
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullCombineFnThrowsException() {
+    CombineFn<Object, Object, Object> combiner = null;
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("combiner cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    doFn.createAggregator("testAggregator", combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullSerializableFnThrowsException() {
+    SerializableFunction<Iterable<Object>, Object> combiner = null;
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("combiner cannot be null");
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    doFn.createAggregator("testAggregator", combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorWithSameNameThrowsException() {
+    String name = "testAggregator";
+    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    doFn.createAggregator(name, combiner);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot create");
+    thrown.expectMessage(name);
+    thrown.expectMessage("already exists");
+
+    doFn.createAggregator(name, combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
+    String nameOne = "testAggregator";
+    String nameTwo = "aggregatorPrime";
+    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
+
+    OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+    Aggregator<Double, Double> aggregatorOne =
+        doFn.createAggregator(nameOne, combiner);
+    Aggregator<Double, Double> aggregatorTwo =
+        doFn.createAggregator(nameTwo, combiner);
+
+    assertNotEquals(aggregatorOne, aggregatorTwo);
+  }
+
+  @Test
+  public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws 
Exception {
+    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception { }
+    };
+    OldDoFn<String, String>.Context context = createContext(fn);
+    context.setupDelegateAggregators();
+
+    thrown.expect(isA(IllegalStateException.class));
+    fn.createAggregator("anyAggregate", Max.ofIntegers());
+  }
+
+  private OldDoFn<String, String>.Context createContext(OldDoFn<String, 
String> fn) {
+    return fn.new Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void output(String output) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void outputWithTimestamp(String output, Instant timestamp) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <T> void sideOutput(TupleTag<T> tag, T output) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <AggInputT, AggOutputT>
+      Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+              String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  @Test
+  public void testPopulateDisplayDataDefaultBehavior() {
+    OldDoFn<String, String> usesDefault =
+        new OldDoFn<String, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {}
+        };
+
+    DisplayData data = DisplayData.from(usesDefault);
+    assertThat(data.items(), empty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
index 4610069..97da9ee 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -23,7 +23,7 @@ import static org.mockito.Mockito.mock;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
+
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
index 5d676dc..71c3aa4 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.flink;
 
 import java.io.Serializable;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 
 /**
  * An interface that runs a {@link PerKeyCombineFn} with unified APIs using

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
index 8ebeadf..90894f2 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.runners.flink;
 
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 2a4a68e..8b2bcc6 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 1b43172..5ec6a77 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -24,12 +24,12 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index a97bd46..aeeabbf 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
index a3d2b18..7882b5f 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Collection;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
index c890272..ad7255b 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index f5a9087..7db30d1 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -24,12 +24,12 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 53b9803..e955679 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -24,11 +24,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index a3fa0d4..81e37f4 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -26,12 +26,12 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
 import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
index 529b1cc..0db7f5a 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Collection;
 import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 90cdf4c..ac85b3c 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index cd6b5aa..d4273b2 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -42,12 +42,12 @@ import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index 0c5be90..4d80a39 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -41,8 +40,7 @@ public class DoFnInfo<InputT, OutputT> implements 
Serializable {
   private final Map<Long, TupleTag<?>> outputMap;
 
   /**
-   * Creates a {@link DoFnInfo} for the given {@link Serializable} object, 
which is expected to be a
-   * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob.
+   * Creates a {@link DoFnInfo} for the given {@link DoFn}.
    */
   public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
       DoFn<InputT, OutputT> doFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
index e03d3b1..cfaf0a6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
  * @param <AggInputT> the type of input element
  * @param <AggOutputT> the type of output element
  */
-class DelegatingAggregator<AggInputT, AggOutputT>
+public class DelegatingAggregator<AggInputT, AggOutputT>
     implements Aggregator<AggInputT, AggOutputT>, Serializable {
   private static final AtomicInteger ID_GEN = new AtomicInteger();
   private final int id;

Reply via email to