[ 
https://issues.apache.org/jira/browse/BEAM-3979?focusedWorklogId=95084&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95084
 ]

ASF GitHub Bot logged work on BEAM-3979:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Apr/18 16:23
            Start Date: 25/Apr/18 16:23
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #4989: [BEAM-3979] Start 
completing the new DoFn vision: plumb context parameters into process functions.
URL: https://github.com/apache/beam/pull/4989
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 299ded7804c..8254620f265 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -50,6 +50,7 @@
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
 
 /**
  * A utility transform that executes a <a
@@ -361,7 +362,18 @@ public void setup() {
     public void processElement(final ProcessContext c) {
       final InputT element = c.element().getKey();
       invoker.invokeSplitRestriction(
-          element, c.element().getValue(), part -> c.output(KV.of(element, 
part)));
+          element, c.element().getValue(), new OutputReceiver<RestrictionT>() {
+            @Override
+            public void output(RestrictionT part) {
+              c.output(KV.of(element, part));
+            }
+
+            @Override
+            public void outputWithTimestamp(RestrictionT part, Instant 
timestamp) {
+              throw new UnsupportedOperationException();
+            }
+          }
+      );
     }
   }
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index b1a3f3bdb62..e8f67a19671 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -29,10 +29,14 @@
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
+import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -112,6 +116,33 @@ public Result invokeProcessElement(
             return processContext;
           }
 
+          @Override
+          public InputT element(DoFn<InputT, OutputT> doFn) {
+            return processContext.element();
+          }
+
+          @Override
+          public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+            return processContext.timestamp();
+          }
+
+          @Override
+          public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+            throw new UnsupportedOperationException(
+                "Access to time domain not supported in ProcessElement"
+            );
+          }
+
+          @Override
+          public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> 
doFn) {
+            return DoFnOutputReceivers.windowedReceiver(processContext, null);
+          }
+
+          @Override
+          public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, 
OutputT> doFn) {
+            return DoFnOutputReceivers.windowedMultiReceiver(processContext);
+          }
+
           @Override
           public RestrictionTracker<?, ?> restrictionTracker() {
             return tracker;
@@ -125,6 +156,12 @@ public BoundedWindow window() {
                 "Access to window of the element not supported in Splittable 
DoFn");
           }
 
+          @Override
+          public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+            throw new UnsupportedOperationException(
+                "Access to pane of the element not supported in Splittable 
DoFn");
+          }
+
           @Override
           public PipelineOptions pipelineOptions() {
             return pipelineOptions;
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 7e60b033e54..0abb71a4303 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -36,6 +36,9 @@
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -237,6 +240,12 @@ public BoundedWindow window() {
           "Cannot access window outside of @ProcessElement and @OnTimer 
methods.");
     }
 
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access paneInfo outside of @ProcessElement methods.");
+    }
+
     @Override
     public PipelineOptions pipelineOptions() {
       return getPipelineOptions();
@@ -260,6 +269,36 @@ public PipelineOptions pipelineOptions() {
           "Cannot access ProcessContext outside of @ProcessElement method.");
     }
 
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Element parameters are not supported outside of @ProcessElement 
method.");
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access timestamp outside of @ProcessElement method.");
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access time domain outside of @ProcessTimer method.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access output receiver outside of @ProcessElement method.");
+    }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access output receiver outside of @ProcessElement method.");
+    }
+
     @Override
     public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
@@ -307,6 +346,13 @@ public BoundedWindow window() {
           "Cannot access window outside of @ProcessElement and @OnTimer 
methods.");
     }
 
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access paneInfo outside of @ProcessElement methods.");
+    }
+
+
     @Override
     public PipelineOptions pipelineOptions() {
       return getPipelineOptions();
@@ -330,6 +376,36 @@ public PipelineOptions pipelineOptions() {
           "Cannot access ProcessContext outside of @ProcessElement method.");
     }
 
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access element outside of @ProcessElement method.");
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access timestamp outside of @ProcessElement method.");
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access time domain outside of @ProcessTimer method.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access outputReceiver in @FinishBundle method.");
+    }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access outputReceiver in @FinishBundle method.");
+    }
+
     @Override
     public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
@@ -458,6 +534,7 @@ public Instant timestamp() {
       return elem.getWindows();
     }
 
+
     @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
     private void checkTimestamp(Instant timestamp) {
       // The documentation of getAllowedTimestampSkew explicitly permits 
Long.MAX_VALUE to be used
@@ -481,6 +558,11 @@ public BoundedWindow window() {
       return Iterables.getOnlyElement(elem.getWindows());
     }
 
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      return pane();
+    }
+
     @Override
     public PipelineOptions pipelineOptions() {
       return getPipelineOptions();
@@ -502,6 +584,32 @@ public PipelineOptions pipelineOptions() {
       return this;
     }
 
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      return element();
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      return  timestamp();
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access time domain outside of @ProcessTimer method.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
+    }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      return DoFnOutputReceivers.windowedMultiReceiver(this);
+    }
+
     @Override
     public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
@@ -587,6 +695,12 @@ public BoundedWindow window() {
       return window;
     }
 
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access paneInfo outside of @ProcessElement methods.");
+    }
+
     @Override
     public PipelineOptions pipelineOptions() {
       return getPipelineOptions();
@@ -614,6 +728,31 @@ public TimeDomain timeDomain() {
       throw new UnsupportedOperationException("ProcessContext parameters are 
not supported.");
     }
 
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Element parameters are not 
supported.");
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      return timestamp();
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      return timeDomain();
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
+    }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      return DoFnOutputReceivers.windowedMultiReceiver(this);
+    }
+
     @Override
     public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       return this;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index a3cdc88d46d..eb593f5bc46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -74,7 +74,7 @@
  * {@literal PCollection<String>} words =
  *     {@literal lines.apply(ParDo.of(new DoFn<String, String>())} {
  *         {@literal @ProcessElement}
- *          public void processElement(ProcessContext c, BoundedWindow window) 
{
+ *          public void processElement({@literal @}Element String element, 
BoundedWindow window) {
  *            ...
  *          }}));
  * </code></pre>
@@ -369,7 +369,14 @@ public Duration getAllowedTimestampSkew() {
   /** Receives values of the given type. */
   public interface OutputReceiver<T> {
     void output(T output);
+    void outputWithTimestamp(T output, Instant timestamp);
   }
+
+  /** Receives tagged output for a multi-output function. */
+  public interface MultiOutputReceiver {
+    <T> OutputReceiver<T> get(TupleTag<T> tag);
+  }
+
   /////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -388,7 +395,7 @@ public Duration getAllowedTimestampSkew() {
    *
    *  {@literal @ProcessElement}
    *   public void processElement(
-   *       ProcessContext c,
+   *       {@literal @Element InputT element},
    *      {@literal @StateId("my-state-id") ValueState<MyState> myState}) {
    *     myState.read();
    *     myState.write(...);
@@ -427,7 +434,7 @@ public Duration getAllowedTimestampSkew() {
    *
    *   {@literal @ProcessElement}
    *    public void processElement(
-   *        ProcessContext c,
+   *       {@literal @Element InputT element},
    *       {@literal @TimerId("my-timer-id") Timer myTimer}) {
    *      myTimer.offset(Duration.standardSeconds(...)).setRelative();
    *    }
@@ -520,15 +527,28 @@ public Duration getAllowedTimestampSkew() {
    * <p>The signature of this method must satisfy the following constraints:
    *
    * <ul>
-   * <li>Its first argument must be a {@link DoFn.ProcessContext}.
    * <li>If one of its arguments is a subtype of {@link RestrictionTracker}, 
then it is a <a
    *     href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link 
DoFn} subject to the
    *     separate requirements described below. Items below are assuming this 
is not a splittable
    *     {@link DoFn}.
-   * <li>If one of its arguments is a subtype of {@link BoundedWindow} then it 
will
+   *  <li>If one of its arguments is tagged with the {@link Element} 
annotation, then it will be
+   *      passed the current element being processed; the argument type must 
match the input type
+   *      of this DoFn.
+   *  <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
+   *      passed the timestamp of the current element being processed; the 
argument must be of type
+   *      {@link Instant}.
+   * <li>If one of its arguments is a subtype of {@link BoundedWindow}, then 
it will
    *     be passed the window of the current element. When applied by {@link 
ParDo} the subtype
    *     of {@link BoundedWindow} must match the type of windows on the input 
{@link PCollection}.
    *     If the window is not accessed a runner may perform additional 
optimizations.
+   *  <li>If one of its arguments is of type {@link PaneInfo}, then it will be 
passed information
+   *      about the current triggering pane.
+   *  <li>If one of the parameters is of type {@link PipelineOptions}, then it 
will be passed the
+   *      options for the current pipeline.
+   *  <li>If one of the parameters is of type {@link OutputReceiver}, then it 
will be passed an
+   *      output receiver for outputting elements to the default output.
+   *  <li>If one of the parameters is of type {@link MultiOutputReceiver}, 
then it will be passed
+   *      an output receiver for outputting to multiple tagged outputs.
    * <li>It must return {@code void}.
    * </ul>
    *
@@ -575,6 +595,22 @@ public Duration getAllowedTimestampSkew() {
   @Target(ElementType.METHOD)
   public @interface ProcessElement {}
 
+  /**
+   * Parameter annotation for the input element for a {@link ProcessElement} 
method.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.PARAMETER)
+  public @interface Element {}
+
+  /**
+   * Parameter annotation for the input element timestamp for a {@link 
ProcessElement} method.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.PARAMETER)
+  public @interface Timestamp {}
+
   /**
    * <b><i>Experimental - no backwards compatibility guarantees. The exact 
name or usage of this
    * feature may change.</i></b>
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
new file mode 100644
index 00000000000..2b97615f84d
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Common {@link OutputReceiver} and {@link MultiOutputReceiver} classes.
+ */
+public class DoFnOutputReceivers {
+  private static class WindowedContextOutputReceiver<T> implements 
OutputReceiver<T> {
+    DoFn<?, ?>.WindowedContext context;
+    @Nullable TupleTag<T> outputTag;
+    public WindowedContextOutputReceiver(DoFn<?, ?>.WindowedContext context,
+                                         @Nullable TupleTag<T> outputTag) {
+      this.context = context;
+      this.outputTag = outputTag;
+    }
+
+    @Override
+    public void output(T output) {
+      if (outputTag != null) {
+        context.output(outputTag, output);
+      } else {
+        ((DoFn<?, T>.WindowedContext) context).output(output);
+      }
+    }
+
+    @Override
+    public void outputWithTimestamp(T output, Instant timestamp) {
+      if (outputTag != null) {
+        context.outputWithTimestamp(outputTag, output, timestamp);
+      } else {
+        ((DoFn<?, T>.WindowedContext) context).outputWithTimestamp(output, 
timestamp);
+      }
+    }
+  }
+
+  private static class WindowedContextMultiOutputReceiver implements 
MultiOutputReceiver {
+    DoFn<?, ?>.WindowedContext context;
+    public WindowedContextMultiOutputReceiver(DoFn<?, ?>.WindowedContext 
context) {
+      this.context = context;
+    }
+
+    @Override
+    public <T> OutputReceiver<T> get(TupleTag<T> tag) {
+      return DoFnOutputReceivers.windowedReceiver(context, tag);
+    }
+  }
+
+  /**
+   * Returns a {@link OutputReceiver} that delegates to a {@link 
DoFn.WindowedContext}.
+   */
+  public static <T> OutputReceiver<T> windowedReceiver(DoFn<?, 
?>.WindowedContext context,
+                                                       @Nullable TupleTag<T> 
outputTag) {
+    return new WindowedContextOutputReceiver<>(context, outputTag);
+  }
+
+  /**
+   * Returns a {@link MultiOutputReceiver} that delegates to a {@link 
DoFn.WindowedContext}.
+   */
+  public static <T> MultiOutputReceiver windowedMultiReceiver(DoFn<?, 
?>.WindowedContext context) {
+    return new WindowedContextMultiOutputReceiver(context);
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index ab76e9b7dfb..1785079053c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -33,9 +33,12 @@
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -241,6 +244,11 @@ public BoundedWindow window() {
               return window;
             }
 
+            @Override
+            public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+              return processContext.pane();
+            }
+
             @Override
             public PipelineOptions pipelineOptions() {
               return getPipelineOptions();
@@ -265,6 +273,31 @@ public PipelineOptions pipelineOptions() {
               return processContext;
             }
 
+            @Override
+            public InputT element(DoFn<InputT, OutputT> doFn) {
+              return processContext.element();
+            }
+
+            @Override
+            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+              return processContext.timestamp();
+            }
+
+            @Override
+            public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+              throw new UnsupportedOperationException(
+                  "Not expected to access TimeDomain from @ProcessElement");   
         }
+
+            @Override
+            public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, 
OutputT> doFn) {
+              return DoFnOutputReceivers.windowedReceiver(processContext, 
null);
+            }
+
+            @Override
+            public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, 
OutputT> doFn) {
+              return DoFnOutputReceivers.windowedMultiReceiver(processContext);
+            }
+
             @Override
             public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
               throw new UnsupportedOperationException("DoFnTester doesn't 
support timers yet.");
@@ -654,6 +687,41 @@ public Void 
dispatch(DoFnSignature.Parameter.WindowParameter p) {
               return null;
             }
 
+            @Override
+            @Nullable
+            public Void dispatch(DoFnSignature.Parameter.ElementParameter p) {
+              return null;
+            }
+
+            @Override
+            @Nullable
+            public Void dispatch(DoFnSignature.Parameter.TimestampParameter p) 
{
+              return null;
+            }
+
+            @Override
+            @Nullable
+            public Void dispatch(DoFnSignature.Parameter.TimeDomainParameter 
p) {
+              return null;
+            }
+
+            @Override
+            @Nullable
+            public Void 
dispatch(DoFnSignature.Parameter.OutputReceiverParameter p) {
+              return null;
+            }
+
+            @Override
+            @Nullable
+            public Void 
dispatch(DoFnSignature.Parameter.TaggedOutputReceiverParameter p) {
+              return null;
+            }
+
+            @Override
+            @Nullable
+            public Void dispatch(DoFnSignature.Parameter.PaneInfoParameter p) {
+              return null;
+            }
             @Override
             protected Void dispatchDefault(DoFnSignature.Parameter p) {
               throw new UnsupportedOperationException(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index c31c4950e4f..6fd23b8bded 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -118,19 +118,19 @@
  * PCollection<String> words =
  *     lines.apply(ParDo.of(new DoFn<String, String>() {
  *        {@literal @}ProcessElement
- *         public void processElement(ProcessContext c) {
- *           String line = c.element();
+ *         public void processElement({@literal @}Element String line,
+ *           {@literal @}OutputReceiver<String> r) {
  *           for (String word : line.split("[^a-zA-Z']+")) {
- *             c.output(word);
+ *             r.output(word);
  *           }
  *         }}));
  * PCollection<Integer> wordLengths =
  *     words.apply(ParDo.of(new DoFn<String, Integer>() {
  *        {@literal @}ProcessElement
- *         public void processElement(ProcessContext c) {
- *           String word = c.element();
+ *         public void processElement({@literal @}Element String word,
+ *           {@literal @}OutputReceiver<Integer> r) {
  *           Integer length = word.length();
- *           c.output(length);
+ *           r.output(length);
  *         }}));
  * }</pre>
  *
@@ -221,22 +221,21 @@
  *             final TupleTag<String> specialWordsTag =
  *                 new TupleTag<String>(){};
  *            {@literal @}ProcessElement
- *             public void processElement(ProcessContext c) {
- *               String word = c.element();
+ *             public void processElement(@Element String word, 
MultiOutputReceiver r) {
  *               if (word.length() <= wordLengthCutOff) {
  *                 // Emit this short word to the main output.
- *                 c.output(word);
+ *                 r.output(wordsBelowCutOffTag, word);
  *               } else {
  *                 // Emit this long word's length to a specified output.
- *                 c.output(wordLengthsAboveCutOffTag, word.length());
+ *                 r.output(wordLengthsAboveCutOffTag, word.length());
  *               }
  *               if (word.startsWith("MARKER")) {
  *                 // Emit this word to a different specified output.
- *                 c.output(markedWordsTag, word);
+ *                 r.output(markedWordsTag, word);
  *               }
  *               if (word.startsWith("SPECIAL")) {
  *                 // Emit this word to the unconsumed output.
- *                 c.output(specialWordsTag, word);
+ *                 r.output(specialWordsTag, word);
  *               }
  *             }})
  *             // Specify the main and consumed output tags of the
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 91d9de70c3a..acc9d040d08 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -68,13 +68,19 @@
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PaneInfoParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StartBundleContextParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TaggedOutputReceiverParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomainParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -84,12 +90,17 @@
 /** Dynamically generates a {@link DoFnInvoker} instances for invoking a 
{@link DoFn}. */
 public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
 
-  public static final String CONTEXT_PARAMETER_METHOD = "context";
   public static final String START_BUNDLE_CONTEXT_PARAMETER_METHOD = 
"startBundleContext";
   public static final String FINISH_BUNDLE_CONTEXT_PARAMETER_METHOD = 
"finishBundleContext";
   public static final String PROCESS_CONTEXT_PARAMETER_METHOD = 
"processContext";
+  public static final String ELEMENT_PARAMETER_METHOD = "element";
+  public static final String TIMESTAMP_PARAMETER_METHOD = "timestamp";
+  public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain";
+  public static final String OUTPUT_PARAMETER_METHOD = "outputReceiver";
+  public static final String TAGGED_OUTPUT_PARAMETER_METHOD = 
"taggedOutputReceiver";
   public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = 
"onTimerContext";
   public static final String WINDOW_PARAMETER_METHOD = "window";
+  public static final String PANE_INFO_PARAMETER_METHOD = "paneInfo";
   public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = 
"pipelineOptions";
   public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = 
"restrictionTracker";
   public static final String STATE_PARAMETER_METHOD = "state";
@@ -585,6 +596,52 @@ public StackManipulation dispatch(ProcessContextParameter 
p) {
                         PROCESS_CONTEXT_PARAMETER_METHOD, DoFn.class)));
           }
 
+          @Override
+          public StackManipulation dispatch(ElementParameter p) {
+            return new StackManipulation.Compound(
+                pushDelegate,
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription(
+                        ELEMENT_PARAMETER_METHOD, DoFn.class)),
+                TypeCasting.to(new 
TypeDescription.ForLoadedType(p.elementT().getRawType())));
+          }
+
+          @Override
+          public StackManipulation dispatch(TimestampParameter p) {
+            return new StackManipulation.Compound(
+                pushDelegate,
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription(
+                        TIMESTAMP_PARAMETER_METHOD, DoFn.class)));
+          }
+
+          @Override
+          public StackManipulation dispatch(TimeDomainParameter p) {
+            return new StackManipulation.Compound(
+                pushDelegate,
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription(
+                        TIME_DOMAIN_PARAMETER_METHOD, DoFn.class)));
+          }
+
+          @Override
+          public StackManipulation dispatch(OutputReceiverParameter p) {
+            return new StackManipulation.Compound(
+                pushDelegate,
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription(
+                        OUTPUT_PARAMETER_METHOD, DoFn.class)));
+          }
+
+          @Override
+          public StackManipulation dispatch(TaggedOutputReceiverParameter p) {
+            return new StackManipulation.Compound(
+                pushDelegate,
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription(
+                        TAGGED_OUTPUT_PARAMETER_METHOD, DoFn.class)));
+          }
+
           @Override
           public StackManipulation dispatch(OnTimerContextParameter p) {
             return new StackManipulation.Compound(
@@ -601,6 +658,15 @@ public StackManipulation dispatch(WindowParameter p) {
                 TypeCasting.to(new 
TypeDescription.ForLoadedType(p.windowT().getRawType())));
           }
 
+          @Override
+          public StackManipulation dispatch(PaneInfoParameter p) {
+            return new StackManipulation.Compound(
+                pushDelegate,
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription(
+                        PANE_INFO_PARAMETER_METHOD, DoFn.class)));
+          }
+
           @Override
           public StackManipulation dispatch(RestrictionTrackerParameter p) {
             // DoFnInvoker.ArgumentProvider.restrictionTracker() returns a 
RestrictionTracker,
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index ddd2c3f1d0a..f2cbecf7aac 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -21,15 +21,20 @@
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFn.StartBundle;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.joda.time.Instant;
 
 /**
  * Interface for invoking the {@code DoFn} processing methods.
@@ -103,6 +108,12 @@
      */
     BoundedWindow window();
 
+    /**
+     * Provides a {@link PaneInfo}.
+     */
+    PaneInfo paneInfo(DoFn<InputT, OutputT> doFn);
+
+
     /** Provide {@link PipelineOptions}. */
     PipelineOptions pipelineOptions();
 
@@ -120,6 +131,26 @@
     /** Provide a {@link DoFn.OnTimerContext} to use with the given {@link 
DoFn}. */
     DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> 
doFn);
 
+    /** Provide a link to the input element. */
+    InputT element(DoFn<InputT, OutputT> doFn);
+
+    /** Provide a link to the input element timestamp. */
+    Instant timestamp(DoFn<InputT, OutputT> doFn);
+
+    /** Provide a link to the time domain for a timer firing.
+     */
+    TimeDomain timeDomain(DoFn<InputT, OutputT> doFn);
+
+    /**
+     * Provide a {@link OutputReceiver} for outputting to the default output.
+     */
+    OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn);
+
+    /**
+     * Provide a {@link MultiOutputReceiver} for outputing to the default 
output.
+     */
+    MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn);
+
     /**
      * If this is a splittable {@link DoFn}, returns the {@link 
RestrictionTracker} associated with
      * the current {@link ProcessElement} call.
@@ -146,6 +177,46 @@
               FakeArgumentProvider.class.getSimpleName()));
     }
 
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
+    }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
+    }
+
     @Override
     public BoundedWindow window() {
       throw new UnsupportedOperationException(
@@ -154,6 +225,14 @@ public BoundedWindow window() {
               FakeArgumentProvider.class.getSimpleName()));
     }
 
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Should never call non-overridden methods of %s",
+              FakeArgumentProvider.class.getSimpleName()));
+    }
+
     @Override
     public PipelineOptions pipelineOptions() {
       throw new UnsupportedOperationException(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 219e0584403..25a0f0fb202 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -32,6 +32,7 @@
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
@@ -188,6 +189,8 @@ private Parameter() {}
         return cases.dispatch((OnTimerContextParameter) this);
       } else if (this instanceof WindowParameter) {
         return cases.dispatch((WindowParameter) this);
+      } else if (this instanceof PaneInfoParameter) {
+        return cases.dispatch((PaneInfoParameter) this);
       } else if (this instanceof RestrictionTrackerParameter) {
         return cases.dispatch((RestrictionTrackerParameter) this);
       } else if (this instanceof StateParameter) {
@@ -196,6 +199,16 @@ private Parameter() {}
         return cases.dispatch((TimerParameter) this);
       } else if (this instanceof PipelineOptionsParameter) {
         return cases.dispatch((PipelineOptionsParameter) this);
+      } else if (this instanceof ElementParameter) {
+        return cases.dispatch((ElementParameter) this);
+      } else if (this instanceof TimestampParameter) {
+        return cases.dispatch((TimestampParameter) this);
+      } else if (this instanceof OutputReceiverParameter) {
+        return cases.dispatch((OutputReceiverParameter) this);
+      } else if (this instanceof TaggedOutputReceiverParameter) {
+        return cases.dispatch((TaggedOutputReceiverParameter) this);
+      } else if (this instanceof TimeDomainParameter) {
+        return cases.dispatch((TimeDomainParameter) this);
       } else {
         throw new IllegalStateException(
             String.format("Attempt to case match on unknown %s subclass %s",
@@ -210,8 +223,14 @@ private Parameter() {}
       ResultT dispatch(StartBundleContextParameter p);
       ResultT dispatch(FinishBundleContextParameter p);
       ResultT dispatch(ProcessContextParameter p);
+      ResultT dispatch(ElementParameter p);
+      ResultT dispatch(TimestampParameter p);
+      ResultT dispatch(TimeDomainParameter p);
+      ResultT dispatch(OutputReceiverParameter p);
+      ResultT dispatch(TaggedOutputReceiverParameter p);
       ResultT dispatch(OnTimerContextParameter p);
       ResultT dispatch(WindowParameter p);
+      ResultT dispatch(PaneInfoParameter p);
       ResultT dispatch(RestrictionTrackerParameter p);
       ResultT dispatch(StateParameter p);
       ResultT dispatch(TimerParameter p);
@@ -239,6 +258,31 @@ public ResultT dispatch(ProcessContextParameter p) {
           return dispatchDefault(p);
         }
 
+        @Override
+        public ResultT dispatch(ElementParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
+        public ResultT dispatch(TaggedOutputReceiverParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
+        public ResultT dispatch(OutputReceiverParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
+        public ResultT dispatch(TimestampParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
+        public ResultT dispatch(TimeDomainParameter p) {
+          return dispatchDefault(p);
+        }
+
         @Override
         public ResultT dispatch(OnTimerContextParameter p) {
           return dispatchDefault(p);
@@ -249,6 +293,11 @@ public ResultT dispatch(WindowParameter p) {
           return dispatchDefault(p);
         }
 
+        @Override
+        public ResultT dispatch(PaneInfoParameter p) {
+          return dispatchDefault(p);
+        }
+
         @Override
         public ResultT dispatch(RestrictionTrackerParameter p) {
           return dispatchDefault(p);
@@ -271,7 +320,7 @@ public ResultT dispatch(PipelineOptionsParameter p) {
       }
     }
 
-    // These parameter descriptors are constant
+    // These parameter descriptors are constant.
     private static final StartBundleContextParameter 
START_BUNDLE_CONTEXT_PARAMETER =
         new AutoValue_DoFnSignature_Parameter_StartBundleContextParameter();
     private static final FinishBundleContextParameter 
FINISH_BUNDLE_CONTEXT_PARAMETER =
@@ -280,17 +329,51 @@ public ResultT dispatch(PipelineOptionsParameter p) {
           new AutoValue_DoFnSignature_Parameter_ProcessContextParameter();
     private static final OnTimerContextParameter ON_TIMER_CONTEXT_PARAMETER =
         new AutoValue_DoFnSignature_Parameter_OnTimerContextParameter();
+    private static final TimestampParameter TIMESTAMP_PARAMETER =
+        new AutoValue_DoFnSignature_Parameter_TimestampParameter();
+    private static final PaneInfoParameter PANE_INFO_PARAMETER =
+        new AutoValue_DoFnSignature_Parameter_PaneInfoParameter();
+    private static final TimeDomainParameter TIME_DOMAIN_PARAMETER =
+        new AutoValue_DoFnSignature_Parameter_TimeDomainParameter();
+    private static final OutputReceiverParameter OUTPUT_RECEIVER_PARAMETER =
+        new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter();
+    private static final TaggedOutputReceiverParameter 
TAGGED_OUTPUT_RECEIVER_PARAMETER =
+        new AutoValue_DoFnSignature_Parameter_TaggedOutputReceiverParameter();
 
     /** Returns a {@link ProcessContextParameter}. */
     public static ProcessContextParameter processContext() {
       return PROCESS_CONTEXT_PARAMETER;
     }
 
+    public static ElementParameter elementParameter(TypeDescriptor<?> 
elementT) {
+      return new AutoValue_DoFnSignature_Parameter_ElementParameter(elementT);
+    }
+
+    public static TimestampParameter timestampParameter() {
+      return TIMESTAMP_PARAMETER;
+    }
+
+    public static TimeDomainParameter timeDomainParameter() {
+      return TIME_DOMAIN_PARAMETER;
+    }
+
+    public static OutputReceiverParameter outputReceiverParameter() {
+      return OUTPUT_RECEIVER_PARAMETER;
+    }
+
+    public static TaggedOutputReceiverParameter 
taggedOutputReceiverParameter() {
+      return TAGGED_OUTPUT_RECEIVER_PARAMETER;
+    }
+
     /** Returns a {@link OnTimerContextParameter}. */
     public static OnTimerContextParameter onTimerContext() {
       return ON_TIMER_CONTEXT_PARAMETER;
     }
 
+    public static PaneInfoParameter paneInfoParameter() {
+      return PANE_INFO_PARAMETER;
+    }
+
     /** Returns a {@link WindowParameter}. */
     public static WindowParameter boundedWindow(TypeDescriptor<? extends 
BoundedWindow> windowT) {
       return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT);
@@ -357,6 +440,60 @@ public static TimerParameter 
timerParameter(TimerDeclaration decl) {
       ProcessContextParameter() {}
     }
 
+    /**
+     * Descriptor for a {@link Parameter} of type {@link DoFn.Element}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class ElementParameter extends Parameter {
+      ElementParameter() {}
+
+      public abstract TypeDescriptor<?> elementT();
+    }
+
+    /**
+     * Descriptor for a {@link Parameter} of type {@link DoFn.Timestamp}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class TimestampParameter extends Parameter {
+      TimestampParameter() {}
+    }
+
+    /**
+     * Descriptor for a {@link Parameter} representing the time domain of a 
timer.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class TimeDomainParameter extends Parameter {
+      TimeDomainParameter() {
+      }
+    }
+
+    /**
+     * Descriptor for a {@link Parameter} of type {@link DoFn.OutputReceiver}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class OutputReceiverParameter extends Parameter {
+      OutputReceiverParameter() {}
+    }
+
+    /**
+     * Descriptor for a {@link Parameter} of type {@link MultiOutputReceiver}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class TaggedOutputReceiverParameter extends 
Parameter {
+      TaggedOutputReceiverParameter() {
+      }
+    }
+
     /**
      * Descriptor for a {@link Parameter} of type {@link DoFn.OnTimerContext}.
      *
@@ -379,6 +516,17 @@ public static TimerParameter 
timerParameter(TimerDeclaration decl) {
       public abstract TypeDescriptor<? extends BoundedWindow> windowT();
     }
 
+    /**
+     * Descriptor for a {@link Parameter} of type
+     * {@link org.apache.beam.sdk.transforms.windowing.PaneInfo}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class PaneInfoParameter extends Parameter {
+      PaneInfoParameter() {}
+    }
+
     /**
      * Descriptor for a {@link Parameter} of a subclass of {@link 
RestrictionTracker}.
      *
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 2feea973d0a..64cd9878d41 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -46,9 +46,12 @@
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
@@ -61,10 +64,12 @@
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeParameter;
+import org.joda.time.Instant;
 
 /**
  * Utilities for working with {@link DoFnSignature}. See {@link #getSignature}.
@@ -79,7 +84,12 @@ private DoFnSignatures() {}
       ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
       ImmutableList.of(
           Parameter.ProcessContextParameter.class,
+          Parameter.ElementParameter.class,
+          Parameter.TimestampParameter.class,
+          Parameter.OutputReceiverParameter.class,
+          Parameter.TaggedOutputReceiverParameter.class,
           Parameter.WindowParameter.class,
+          Parameter.PaneInfoParameter.class,
           Parameter.PipelineOptionsParameter.class,
           Parameter.TimerParameter.class,
           Parameter.StateParameter.class);
@@ -88,6 +98,10 @@ private DoFnSignatures() {}
       ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
           ImmutableList.of(
               Parameter.PipelineOptionsParameter.class,
+              Parameter.ElementParameter.class,
+              Parameter.TimestampParameter.class,
+              Parameter.OutputReceiverParameter.class,
+              Parameter.TaggedOutputReceiverParameter.class,
               Parameter.ProcessContextParameter.class,
               Parameter.RestrictionTrackerParameter.class);
 
@@ -95,8 +109,12 @@ private DoFnSignatures() {}
       ALLOWED_ON_TIMER_PARAMETERS =
           ImmutableList.of(
               Parameter.OnTimerContextParameter.class,
+              Parameter.TimestampParameter.class,
+              Parameter.TimeDomainParameter.class,
               Parameter.WindowParameter.class,
               Parameter.PipelineOptionsParameter.class,
+              Parameter.OutputReceiverParameter.class,
+              Parameter.TaggedOutputReceiverParameter.class,
               Parameter.TimerParameter.class,
               Parameter.StateParameter.class);
 
@@ -801,7 +819,19 @@ private static Parameter analyzeExtraParameter(
 
     ErrorReporter paramErrors = methodErrors.forParameter(param);
 
-    if (rawType.equals(DoFn.ProcessContext.class)) {
+    if (hasElementAnnotation(param.getAnnotations())) {
+      methodErrors.checkArgument(paramT.equals(inputT),
+          "@Element argument must have type %s", inputT);
+      return Parameter.elementParameter(paramT);
+    }  else if (hasTimestampAnnotation(param.getAnnotations())) {
+      methodErrors.checkArgument(rawType.equals(Instant.class),
+          "@Timestamp argument must have type org.joda.time.Instant.");
+      return Parameter.timestampParameter();
+    } else if (rawType.equals(TimeDomain.class)) {
+      return Parameter.timeDomainParameter();
+    } else if (rawType.equals(PaneInfo.class)) {
+      return Parameter.paneInfoParameter();
+    } else if (rawType.equals(DoFn.ProcessContext.class)) {
       paramErrors.checkArgument(paramT.equals(expectedProcessContextT),
         "ProcessContext argument must have type %s",
         formatType(expectedProcessContextT));
@@ -818,6 +848,15 @@ private static Parameter analyzeExtraParameter(
           "Multiple %s parameters",
           BoundedWindow.class.getSimpleName());
       return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) 
paramT);
+    } else if (rawType.equals(OutputReceiver.class)) {
+      TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(outputT);
+      paramErrors.checkArgument(
+          paramT.equals(expectedReceiverT),
+      "OutputReceiver should be parameterized by %s",
+          outputT);
+      return Parameter.outputReceiverParameter();
+    }  else if (rawType.equals(MultiOutputReceiver.class)) {
+      return Parameter.taggedOutputReceiverParameter();
     } else if (PipelineOptions.class.equals(rawType)) {
       methodErrors.checkArgument(
           !methodContext.hasPipelineOptionsParamter(),
@@ -937,6 +976,24 @@ private static String getStateId(List<Annotation> 
annotations) {
     return null;
   }
 
+  private static boolean hasElementAnnotation(List<Annotation> annotations) {
+    for (Annotation anno : annotations) {
+      if (anno.annotationType().equals(DoFn.Element.class)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean hasTimestampAnnotation(List<Annotation> annotations) {
+    for (Annotation anno : annotations) {
+      if (anno.annotationType().equals(DoFn.Timestamp.class)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Nullable
   private static TypeDescriptor<?> getTrackerType(TypeDescriptor<?> fnClass, 
Method method) {
     Type[] params = method.getGenericParameterTypes();
@@ -1028,9 +1085,9 @@ private static String getStateId(List<Annotation> 
annotations) {
    * OutputT}.
    */
   private static <OutputT> TypeDescriptor<DoFn.OutputReceiver<OutputT>> 
outputReceiverTypeOf(
-      TypeDescriptor<OutputT> inputT) {
+      TypeDescriptor<OutputT> outputT) {
     return new TypeDescriptor<DoFn.OutputReceiver<OutputT>>() {}.where(
-        new TypeParameter<OutputT>() {}, inputT);
+        new TypeParameter<OutputT>() {}, outputT);
   }
 
   @VisibleForTesting
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 920fba7c8cc..1287e357f61 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -139,8 +139,11 @@
 
   private static class PrintingDoFn extends DoFn<String, String> {
     @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) {
-      c.output(c.element() + ":" + c.timestamp().getMillis()
+    public void processElement(@Element String element,
+                               @Timestamp Instant timestamp,
+                               BoundedWindow window,
+                               OutputReceiver<String> receiver) {
+      receiver.output(element + ":" + timestamp.getMillis()
           + ":" + window.maxTimestamp().getMillis());
     }
   }
@@ -183,11 +186,11 @@ public void startBundle() {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
+    public void processElement(ProcessContext c, @Element Integer element) {
       assertThat(state,
                  anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
       state = State.PROCESSING;
-      outputToAllWithSideInputs(c, "processing: " + c.element());
+      outputToAllWithSideInputs(c, "processing: " + element);
     }
 
     @FinishBundle
@@ -260,9 +263,8 @@ public void processElement(ProcessContext c) {
 
   static class TestOutputTimestampDoFn<T extends Number> extends DoFn<T, T> {
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      T value = c.element();
-      c.outputWithTimestamp(value, new Instant(value.longValue()));
+    public void processElement(@Element T value, OutputReceiver<T> r) {
+      r.outputWithTimestamp(value, new Instant(value.longValue()));
     }
   }
 
@@ -281,19 +283,19 @@ public Duration getAllowedTimestampSkew() {
       return allowedTimestampSkew;
     }
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      Instant timestamp = c.timestamp();
+    public void processElement(@Element T value, @Timestamp Instant timestamp,
+                               OutputReceiver<T> r) {
       checkNotNull(timestamp);
-      T value = c.element();
-      c.outputWithTimestamp(value, timestamp.plus(durationToShift));
+      r.outputWithTimestamp(value, timestamp.plus(durationToShift));
     }
   }
 
   static class TestFormatTimestampDoFn<T extends Number> extends DoFn<T, 
String> {
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      checkNotNull(c.timestamp());
-      c.output("processing: " + c.element() + ", timestamp: " + 
c.timestamp().getMillis());
+    public void processElement(@Element T element, @Timestamp Instant 
timestamp,
+                               OutputReceiver<String> r) {
+      checkNotNull(timestamp);
+      r.output("processing: " + element + ", timestamp: " + 
timestamp.getMillis());
     }
   }
 
@@ -318,9 +320,10 @@ public PCollectionTuple expand(PCollection<Integer> input) 
{
       }
 
       @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        if (c.element() % divisor == 0) {
-          c.output(c.element());
+      public void processElement(@Element Integer element,
+                                 OutputReceiver<Integer> r) throws Exception {
+        if (element % divisor == 0) {
+          r.output(element);
         }
       }
     }
@@ -515,8 +518,9 @@ public void testParDoWithOnlyTaggedOutput() {
         .apply(ParDo
             .of(new DoFn<Integer, Void>() {
                 @ProcessElement
-                public void processElement(ProcessContext c) {
-                  c.output(additionalOutputTag, c.element());
+                public void processElement(@Element Integer element,
+                                           MultiOutputReceiver r) {
+                  r.get(additionalOutputTag).output(element);
                 }})
             .withOutputTags(mainOutputTag, 
TupleTagList.of(additionalOutputTag)));
 
@@ -720,8 +724,8 @@ private FnWithSideInputs(PCollectionView<Integer> view) {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element() + ":" + c.sideInput(view));
+    public void processElement(ProcessContext c, @Element String element) {
+      c.output(element + ":" + c.sideInput(view));
     }
   }
 
@@ -830,7 +834,7 @@ public void 
testParDoTransformNameBasedDoFnWithTrimmedSuffix() {
   @Test
   public void testParDoMultiNameBasedDoFnWithTrimmerSuffix() {
     assertThat(
-        ParDo.of(new TaggedOutputDummyFn(null)).withOutputTags(null, 
null).getName(),
+        ParDo.of(new TaggedOutputDummyFn(null, null)).withOutputTags(null, 
null).getName(),
         containsString("ParMultiDo(TaggedOutputDummy)"));
   }
 
@@ -881,10 +885,10 @@ public void 
testMultiOutputAppliedMultipleTimesDifferentOutputs() {
     DoFn<Long, Long> fn =
         new DoFn<Long, Long>() {
           @ProcessElement
-          public void processElement(ProcessContext cxt) {
+          public void processElement(ProcessContext cxt, @Element Long 
element) {
             cxt.output(cxt.element());
             cxt.output(valueAsString, Long.toString(cxt.element()));
-            cxt.output(valueAsInt, Long.valueOf(cxt.element()).intValue());
+            cxt.output(valueAsInt, element.intValue());
           }
         };
 
@@ -950,8 +954,8 @@ public void testJsonEscaping() {
     // Declare an arbitrary function and make sure we can serialize it
     DoFn<Integer, Integer> doFn = new DoFn<Integer, Integer>() {
       @ProcessElement
-      public void processElement(ProcessContext c) {
-        c.output(c.element() + 1);
+      public void processElement(@Element Integer element, 
OutputReceiver<Integer> r) {
+        r.output(element + 1);
       }
     };
 
@@ -1001,28 +1005,33 @@ public void verifyDeterministic() {}
   }
 
   private static class TaggedOutputDummyFn extends DoFn<Integer, Integer> {
+    private TupleTag<Integer> mainOutputTag;
     private TupleTag<TestDummy> dummyOutputTag;
-    public TaggedOutputDummyFn(TupleTag<TestDummy> dummyOutputTag) {
+    public TaggedOutputDummyFn(TupleTag<Integer> mainOutputTag,
+                               TupleTag<TestDummy> dummyOutputTag) {
+      this.mainOutputTag = mainOutputTag;
       this.dummyOutputTag = dummyOutputTag;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(1);
-      c.output(dummyOutputTag, new TestDummy());
+    public void processElement(MultiOutputReceiver r) {
+      r.get(mainOutputTag).output(1);
+      r.get(dummyOutputTag).output(new TestDummy());
      }
   }
 
   private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> {
+    private TupleTag<TestDummy> mainOutputTag;
     private TupleTag<Integer> intOutputTag;
-    public MainOutputDummyFn(TupleTag<Integer> intOutputTag) {
+    public MainOutputDummyFn(TupleTag<TestDummy> mainOutputTag, 
TupleTag<Integer> intOutputTag) {
+      this.mainOutputTag = mainOutputTag;
       this.intOutputTag = intOutputTag;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(new TestDummy());
-      c.output(intOutputTag, 1);
+    public void processElement(MultiOutputReceiver r) {
+      r.get(mainOutputTag).output(new TestDummy());
+      r.get(intOutputTag).output(1);
      }
   }
 
@@ -1174,7 +1183,7 @@ public void testTaggedOutputUnknownCoder() throws 
Exception {
 
     final TupleTag<Integer> mainOutputTag = new TupleTag<>("main");
     final TupleTag<TestDummy> additionalOutputTag = new 
TupleTag<>("unknownSide");
-    input.apply(ParDo.of(new TaggedOutputDummyFn(additionalOutputTag))
+    input.apply(ParDo.of(new TaggedOutputDummyFn(mainOutputTag, 
additionalOutputTag))
         .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
 
     thrown.expect(IllegalStateException.class);
@@ -1192,7 +1201,7 @@ public void testTaggedOutputUnregisteredExplicitCoder() 
throws Exception {
     final TupleTag<Integer> mainOutputTag = new TupleTag<>("main");
     final TupleTag<TestDummy> additionalOutputTag = new 
TupleTag<>("unregisteredSide");
     ParDo.MultiOutput<Integer, Integer> pardo =
-        ParDo.of(new TaggedOutputDummyFn(additionalOutputTag))
+        ParDo.of(new TaggedOutputDummyFn(mainOutputTag, additionalOutputTag))
             .withOutputTags(mainOutputTag, 
TupleTagList.of(additionalOutputTag));
     PCollectionTuple outputTuple = input.apply(pardo);
 
@@ -1219,7 +1228,7 @@ public void testMainOutputUnregisteredExplicitCoder() {
     final TupleTag<Integer> additionalOutputTag = new 
TupleTag<Integer>("additionalOutput") {};
     PCollectionTuple outputTuple =
         input.apply(
-            ParDo.of(new MainOutputDummyFn(additionalOutputTag))
+            ParDo.of(new MainOutputDummyFn(mainOutputTag, additionalOutputTag))
                 .withOutputTags(mainOutputTag, 
TupleTagList.of(additionalOutputTag)));
 
     outputTuple.get(mainOutputTag).setCoder(new TestDummyCoder());
@@ -1243,8 +1252,7 @@ public void testMainOutputApplyTaggedOutputNoCoder() {
             .of(
                 new DoFn<TestDummy, TestDummy>() {
                   @ProcessElement
-                  public void processElement(ProcessContext context) {
-                    TestDummy element = context.element();
+                  public void processElement(ProcessContext context, @Element 
TestDummy element) {
                     context.output(element);
                     context.output(additionalOutputTag, element);
                   }
@@ -1306,11 +1314,12 @@ public void testParDoTaggedOutputWithTimestamp() {
                 ParDo.of(
                         new DoFn<Integer, Integer>() {
                           @ProcessElement
-                          public void processElement(ProcessContext c) {
-                            c.outputWithTimestamp(
-                                additionalOutputTag,
-                                c.element(),
-                                new Instant(c.element().longValue()));
+                          public void processElement(@Element Integer element,
+                                                     MultiOutputReceiver r) {
+                            r.get(additionalOutputTag)
+                                .outputWithTimestamp(
+                                element,
+                                new Instant(element.longValue()));
                           }
                         })
                     .withOutputTags(mainOutputTag, 
TupleTagList.of(additionalOutputTag)))
@@ -1405,10 +1414,11 @@ public void testParDoShiftTimestampUnlimited() {
                 ParDo.of(
                     new DoFn<Long, Long>() {
                       @ProcessElement
-                      public void reassignTimestamps(ProcessContext context) {
+                      public void reassignTimestamps(ProcessContext context,
+                                                     @Element Long element) {
                         // Shift the latest element as far backwards in time 
as the model permits
                         context.outputWithTimestamp(
-                            context.element(), 
BoundedWindow.TIMESTAMP_MIN_VALUE);
+                            element, BoundedWindow.TIMESTAMP_MIN_VALUE);
                       }
 
                       @Override
@@ -1479,10 +1489,12 @@ public void testWindowingInStartAndFinishBundle() {
                 ParDo.of(
                     new DoFn<String, String>() {
                       @ProcessElement
-                      public void processElement(ProcessContext c) {
-                        c.output(c.element());
+                      public void processElement(@Element String element,
+                                                 @Timestamp Instant timestamp,
+                                                 OutputReceiver<String> r) {
+                        r.output(element);
                         System.out.println(
-                            "Process: " + c.element() + ":" + 
c.timestamp().getMillis());
+                            "Process: " + element + ":" + 
timestamp.getMillis());
                       }
 
                       @FinishBundle
@@ -1555,10 +1567,10 @@ public void testValueStateSimple() {
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
-          public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+          public void processElement(@StateId(stateId) ValueState<Integer> 
state,
+                                     OutputReceiver<Integer> r) {
             Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
-            c.output(currentValue);
+            r.output(currentValue);
             state.write(currentValue + 1);
           }
         };
@@ -1585,12 +1597,14 @@ public void testValueStateDedup() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<Integer> 
seenState) {
+              @Element KV<Integer, Integer> element,
+              @StateId(stateId) ValueState<Integer> seenState,
+              OutputReceiver<Integer> r) {
             Integer seen = MoreObjects.firstNonNull(seenState.read(), 0);
 
             if (seen == 0) {
               seenState.write(seen + 1);
-              c.output(c.element().getValue());
+              r.output(element.getValue());
             }
           }
         };
@@ -1737,9 +1751,10 @@ public void testValueStateCoderInference() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<MyInteger> state) 
{
+              ProcessContext c, @StateId(stateId) ValueState<MyInteger> state,
+              OutputReceiver<MyInteger> r) {
             MyInteger currentValue = MoreObjects.firstNonNull(state.read(), 
new MyInteger(0));
-            c.output(currentValue);
+            r.output(currentValue);
             state.write(new MyInteger(currentValue.getValue() + 1));
           }
         };
@@ -1766,10 +1781,10 @@ public void testValueStateCoderInferenceFailure() 
throws Exception {
               StateSpecs.value();
 
           @ProcessElement
-          public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<MyInteger> state) 
{
+          public void processElement(@StateId(stateId) ValueState<MyInteger> 
state,
+                                     OutputReceiver<MyInteger> r) {
             MyInteger currentValue = MoreObjects.firstNonNull(state.read(), 
new MyInteger(0));
-            c.output(currentValue);
+            r.output(currentValue);
             state.write(new MyInteger(currentValue.getValue() + 1));
           }
         };
@@ -1797,10 +1812,10 @@ public void 
testValueStateCoderInferenceFromInputCoder() {
               StateSpecs.value();
 
           @ProcessElement
-          public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<MyInteger> state) 
{
+          public void processElement(@StateId(stateId) ValueState<MyInteger> 
state,
+                                     OutputReceiver<MyInteger> r) {
             MyInteger currentValue = MoreObjects.firstNonNull(state.read(), 
new MyInteger(0));
-            c.output(currentValue);
+            r.output(currentValue);
             state.write(new MyInteger(currentValue.getValue() + 1));
           }
         };
@@ -1830,13 +1845,15 @@ public void testCoderInferenceOfList() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<List<MyInteger>> 
state) {
-            MyInteger myInteger = new MyInteger(c.element().getValue());
+              @Element KV<String, Integer> element,
+              @StateId(stateId) ValueState<List<MyInteger>> state,
+              OutputReceiver<List<MyInteger>> r) {
+            MyInteger myInteger = new MyInteger(element.getValue());
             List<MyInteger> currentValue = state.read();
             List<MyInteger> newValue = currentValue != null
                 ? 
ImmutableList.<MyInteger>builder().addAll(currentValue).add(myInteger).build()
                 : Collections.singletonList(myInteger);
-            c.output(newValue);
+            r.output(newValue);
             state.write(newValue);
           }
         };
@@ -1861,9 +1878,9 @@ public void testValueStateFixedWindows() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+              @StateId(stateId) ValueState<Integer> state, 
OutputReceiver<Integer> r) {
             Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
-            c.output(currentValue);
+            r.output(currentValue);
             state.write(currentValue + 1);
           }
         };
@@ -1908,10 +1925,10 @@ public void testValueStateSameId() {
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
-          public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+          public void processElement(@StateId(stateId) ValueState<Integer> 
state,
+                                     OutputReceiver<KV<String, Integer>> r) {
             Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
-            c.output(KV.of("sizzle", currentValue));
+            r.output(KV.of("sizzle", currentValue));
             state.write(currentValue + 1);
           }
         };
@@ -1924,10 +1941,10 @@ public void processElement(
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
-          public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+          public void processElement(@StateId(stateId) ValueState<Integer> 
state,
+                                     OutputReceiver<Integer> r) {
             Integer currentValue = MoreObjects.firstNonNull(state.read(), 13);
-            c.output(currentValue);
+            r.output(currentValue);
             state.write(currentValue + 13);
           }
         };
@@ -1962,12 +1979,12 @@ public void testValueStateTaggedOutput() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+              @StateId(stateId) ValueState<Integer> state, MultiOutputReceiver 
r) {
             Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
             if (currentValue % 2 == 0) {
-              c.output(currentValue);
+              r.get(evenTag).output(currentValue);
             } else {
-              c.output(oddTag, currentValue);
+              r.get(oddTag).output(currentValue);
             }
             state.write(currentValue + 1);
           }
@@ -2009,9 +2026,11 @@ public void testBagState() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) BagState<Integer> state) {
+              @Element KV<String, Integer> element,
+              @StateId(stateId) BagState<Integer> state,
+              OutputReceiver<List<Integer>> r) {
             ReadableState<Boolean> isEmpty = state.isEmpty();
-            state.add(c.element().getValue());
+            state.add(element.getValue());
             assertFalse(isEmpty.read());
             Iterable<Integer> currentValue = state.read();
             if (Iterables.size(currentValue) >= 4) {
@@ -2022,7 +2041,7 @@ public void processElement(
 
               List<Integer> sorted = Lists.newArrayList(currentValue);
               Collections.sort(sorted);
-              c.output(sorted);
+              r.output(sorted);
             }
           }
         };
@@ -2053,13 +2072,15 @@ public void testBagStateCoderInference() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) BagState<MyInteger> state) {
-            state.add(new MyInteger(c.element().getValue()));
+              @Element KV<String, Integer> element,
+              @StateId(stateId) BagState<MyInteger> state,
+              OutputReceiver<List<MyInteger>> r) {
+            state.add(new MyInteger(element.getValue()));
             Iterable<MyInteger> currentValue = state.read();
             if (Iterables.size(currentValue) >= 4) {
               List<MyInteger> sorted = Lists.newArrayList(currentValue);
               Collections.sort(sorted);
-              c.output(sorted);
+              r.output(sorted);
             }
           }
         };
@@ -2091,13 +2112,15 @@ public void testBagStateCoderInferenceFailure() throws 
Exception {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) BagState<MyInteger> state) {
-            state.add(new MyInteger(c.element().getValue()));
+              @Element  KV<String, Integer> element,
+              @StateId(stateId) BagState<MyInteger> state,
+              OutputReceiver<List<MyInteger>> r) {
+            state.add(new MyInteger(element.getValue()));
             Iterable<MyInteger> currentValue = state.read();
             if (Iterables.size(currentValue) >= 4) {
               List<MyInteger> sorted = Lists.newArrayList(currentValue);
               Collections.sort(sorted);
-              c.output(sorted);
+              r.output(sorted);
             }
           }
         };
@@ -2132,12 +2155,12 @@ public void testSetState() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c,
+              @Element KV<String, Integer> element,
               @StateId(stateId) SetState<Integer> state,
-              @StateId(countStateId) CombiningState<Integer, int[], Integer>
-                  count) {
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count,
+              OutputReceiver<Set<Integer>> r) {
             ReadableState<Boolean> isEmpty = state.isEmpty();
-            state.add(c.element().getValue());
+            state.add(element.getValue());
             assertFalse(isEmpty.read());
             count.add(1);
             if (count.read() >= 4) {
@@ -2148,7 +2171,7 @@ public void processElement(
               assertEquals(4, Iterables.size(state.read()));
 
               Set<Integer> set = Sets.newHashSet(ints);
-              c.output(set);
+              r.output(set);
             }
           }
         };
@@ -2184,14 +2207,15 @@ public void testSetStateCoderInference() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c,
+              @Element KV<String, Integer> element,
               @StateId(stateId) SetState<MyInteger> state,
-              @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count) {
-            state.add(new MyInteger(c.element().getValue()));
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count,
+              OutputReceiver<Set<MyInteger>> r) {
+            state.add(new MyInteger(element.getValue()));
             count.add(1);
             if (count.read() >= 4) {
               Set<MyInteger> set = Sets.newHashSet(state.read());
-              c.output(set);
+              r.output(set);
             }
           }
         };
@@ -2227,14 +2251,15 @@ public void testSetStateCoderInferenceFailure() throws 
Exception {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c,
+              @Element KV<String, Integer> element,
               @StateId(stateId) SetState<MyInteger> state,
-              @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count) {
-            state.add(new MyInteger(c.element().getValue()));
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count,
+              OutputReceiver<Set<MyInteger>> r) {
+            state.add(new MyInteger(element.getValue()));
             count.add(1);
             if (count.read() >= 4) {
               Set<MyInteger> set = Sets.newHashSet(state.read());
-              c.output(set);
+              r.output(set);
             }
           }
         };
@@ -2269,10 +2294,12 @@ public void testMapState() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) MapState<String, Integer> 
state,
-              @StateId(countStateId) CombiningState<Integer, int[], Integer>
-                  count) {
-            KV<String, Integer> value = c.element().getValue();
+              ProcessContext c,
+              @Element KV<String, KV<String, Integer>> element,
+              @StateId(stateId) MapState<String, Integer> state,
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count,
+              OutputReceiver<KV<String, Integer>> r) {
+            KV<String, Integer> value = element.getValue();
             ReadableState<Iterable<Entry<String, Integer>>> entriesView = 
state.entries();
             state.put(value.getKey(), value.getValue());
             count.add(1);
@@ -2286,7 +2313,7 @@ public void processElement(
               assertEquals(4, Iterables.size(state.entries().read()));
 
               for (Map.Entry<String, Integer> entry : iterate) {
-                c.output(KV.of(entry.getKey(), entry.getValue()));
+                r.output(KV.of(entry.getKey(), entry.getValue()));
               }
             }
           }
@@ -2324,16 +2351,17 @@ public void testMapStateCoderInference() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) MapState<String, MyInteger> 
state,
-              @StateId(countStateId) CombiningState<Integer, int[], Integer>
-                  count) {
-            KV<String, Integer> value = c.element().getValue();
+              @Element KV<String, KV<String, Integer>> element,
+              @StateId(stateId) MapState<String, MyInteger> state,
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count,
+              OutputReceiver<KV<String, MyInteger>> r) {
+            KV<String, Integer> value = element.getValue();
             state.put(value.getKey(), new MyInteger(value.getValue()));
             count.add(1);
             if (count.read() >= 4) {
               Iterable<Map.Entry<String, MyInteger>> iterate = 
state.entries().read();
               for (Map.Entry<String, MyInteger> entry : iterate) {
-                c.output(KV.of(entry.getKey(), entry.getValue()));
+                r.output(KV.of(entry.getKey(), entry.getValue()));
               }
             }
           }
@@ -2371,16 +2399,18 @@ public void testMapStateCoderInferenceFailure() throws 
Exception {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) MapState<String, MyInteger> 
state,
-              @StateId(countStateId) CombiningState<Integer, int[], Integer>
-                  count) {
-            KV<String, Integer> value = c.element().getValue();
+              ProcessContext c,
+              @Element KV<String, KV<String, Integer>> element,
+              @StateId(stateId) MapState<String, MyInteger> state,
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count,
+              OutputReceiver<KV<String, MyInteger>> r) {
+            KV<String, Integer> value = element.getValue();
             state.put(value.getKey(), new MyInteger(value.getValue()));
             count.add(1);
             if (count.read() >= 4) {
               Iterable<Map.Entry<String, MyInteger>> iterate = 
state.entries().read();
               for (Map.Entry<String, MyInteger> entry : iterate) {
-                c.output(KV.of(entry.getKey(), entry.getValue()));
+                r.output(KV.of(entry.getKey(), entry.getValue()));
               }
             }
           }
@@ -2415,11 +2445,13 @@ public void testCombiningState() {
           @ProcessElement
           public void processElement(
               ProcessContext c,
-              @StateId(stateId) CombiningState<Double, CountSum<Double>, 
Double> state) {
-            state.add(c.element().getValue());
+              @Element KV<String, Double> element,
+              @StateId(stateId) CombiningState<Double, CountSum<Double>, 
Double> state,
+              OutputReceiver<String> r) {
+            state.add(element.getValue());
             Double currentValue = state.read();
             if (Math.abs(currentValue - 0.5) < EPSILON) {
-              c.output("right on");
+              r.output("right on");
             }
           }
         };
@@ -2476,12 +2508,13 @@ public Integer extractOutput(MyInteger accumulator) {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c,
-              @StateId(stateId) CombiningState<Integer, MyInteger, Integer> 
state) {
-            state.add(c.element().getValue());
+              @Element KV<String, Integer> element,
+              @StateId(stateId) CombiningState<Integer, MyInteger, Integer> 
state,
+              OutputReceiver<String> r) {
+            state.add(element.getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
-              c.output("right on");
+              r.output("right on");
             }
           }
         };
@@ -2536,12 +2569,13 @@ public Integer extractOutput(MyInteger accumulator) {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c,
-              @StateId(stateId) CombiningState<Integer, MyInteger, Integer> 
state) {
-            state.add(c.element().getValue());
+              @Element KV<String, Integer> element,
+              @StateId(stateId) CombiningState<Integer, MyInteger, Integer> 
state,
+              OutputReceiver<String> r) {
+            state.add(element.getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
-              c.output("right on");
+              r.output("right on");
             }
           }
         };
@@ -2570,12 +2604,14 @@ public void testCombiningStateParameterSuperclass() {
               StateSpecs.combining(Sum.ofIntegers());
 
           @ProcessElement
-          public void processElement(ProcessContext c,
-              @StateId(stateId) GroupingState<Integer, Integer> state) {
-            state.add(c.element().getValue());
+          public void processElement(
+              @Element KV<Integer, Integer> element,
+              @StateId(stateId) GroupingState<Integer, Integer> state,
+              OutputReceiver<String> r) {
+            state.add(element.getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
-              c.output("right on");
+              r.output("right on");
             }
           }
         };
@@ -2607,17 +2643,20 @@ public void testBagStateSideInput() {
 
           @ProcessElement
           public void processElement(
-              ProcessContext c, @StateId(stateId) BagState<Integer> state) {
-            state.add(c.element().getValue());
+              ProcessContext c,
+              @Element KV<String, Integer>  element,
+              @StateId(stateId) BagState<Integer> state,
+              OutputReceiver<List<Integer>> r) {
+            state.add(element.getValue());
             Iterable<Integer> currentValue = state.read();
             if (Iterables.size(currentValue) >= 4) {
               List<Integer> sorted = Lists.newArrayList(currentValue);
               Collections.sort(sorted);
-              c.output(sorted);
+              r.output(sorted);
 
               List<Integer> sideSorted = 
Lists.newArrayList(c.sideInput(listView));
               Collections.sort(sideSorted);
-              c.output(sideSorted);
+              r.output(sideSorted);
             }
           }
         };
@@ -2663,14 +2702,16 @@ public void testEventTimeTimerBounded() throws 
Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @ProcessElement
-          public void processElement(ProcessContext context, @TimerId(timerId) 
Timer timer) {
+          public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
             timer.offset(Duration.standardSeconds(1)).setRelative();
-            context.output(3);
+            r.output(3);
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output(42);
+          public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+            if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+              r.output(42);
+            }
           }
         };
 
@@ -2696,14 +2737,16 @@ public void testGbkFollowedByUserTimers() throws 
Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @ProcessElement
-          public void processElement(ProcessContext context, 
@TimerId(TIMER_ID) Timer timer) {
+          public void processElement(@TimerId(TIMER_ID) Timer timer, 
OutputReceiver<Integer> r) {
             timer.offset(Duration.standardSeconds(1)).setRelative();
-            context.output(3);
+            r.output(3);
           }
 
           @OnTimer(TIMER_ID)
-          public void onTimer(OnTimerContext context) {
-            context.output(42);
+          public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+            if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+              r.output(42);
+            }
           }
         };
 
@@ -2728,14 +2771,17 @@ public void testEventTimeTimerAlignBounded() throws 
Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @ProcessElement
-          public void processElement(ProcessContext context, @TimerId(timerId) 
Timer timer) {
+          public void processElement(@TimerId(timerId) Timer timer,
+                                     @Timestamp Instant timestamp,
+                                     OutputReceiver<KV<Integer, Instant>> r) {
             
timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
-            context.output(KV.of(3, context.timestamp()));
+            r.output(KV.of(3, timestamp));
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output(KV.of(42, context.timestamp()));
+          public void onTimer(@Timestamp Instant timestamp,
+                              OutputReceiver<KV<Integer, Instant>> r) {
+            r.output(KV.of(42, timestamp));
           }
         };
 
@@ -2758,13 +2804,13 @@ public void testTimerReceivedInOriginalWindow() throws 
Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @ProcessElement
-          public void processElement(ProcessContext context, @TimerId(timerId) 
Timer timer) {
+          public void processElement(@TimerId(timerId) Timer timer) {
             timer.offset(Duration.standardSeconds(1)).setRelative();
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context, BoundedWindow window) {
-            context.output(context.window());
+          public void onTimer(BoundedWindow window, 
OutputReceiver<BoundedWindow> r) {
+            r.output(window);
           }
 
           public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() {
@@ -2806,15 +2852,15 @@ public void testEventTimeTimerAbsolute() throws 
Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @ProcessElement
-          public void processElement(
-              ProcessContext context, @TimerId(timerId) Timer timer, 
BoundedWindow window) {
+          public void processElement(@TimerId(timerId) Timer timer, 
BoundedWindow window,
+                                     OutputReceiver<Integer> r) {
             timer.set(window.maxTimestamp());
-            context.output(3);
+            r.output(3);
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output(42);
+          public void onTimer(OutputReceiver<Integer> r) {
+            r.output(42);
           }
         };
 
@@ -2850,12 +2896,12 @@ public void processElement(
 
           @OnTimer(timerId)
           public void onLoopTimer(
-              OnTimerContext ctx,
               @StateId(stateId) ValueState<Integer> countState,
-              @TimerId(timerId) Timer loopTimer) {
+              @TimerId(timerId) Timer loopTimer,
+              OutputReceiver<Integer> r) {
             int count = MoreObjects.firstNonNull(countState.read(), 0);
             if (count < loopCount) {
-              ctx.output(count);
+              r.output(count);
               countState.write(count + 1);
               loopTimer.offset(Duration.millis(1)).setRelative();
             }
@@ -2904,8 +2950,9 @@ public void processElement(
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context, @StateId(stateId) 
ValueState<String> state) {
-            context.output(KV.of(state.read(), timerOutput));
+          public void onTimer(@StateId(stateId) ValueState<String> state,
+                              OutputReceiver<KV<String, Integer>> r) {
+            r.output(KV.of(state.read(), timerOutput));
           }
         };
 
@@ -2948,12 +2995,12 @@ public void testAbsoluteProcessingTimeTimerRejected() 
throws Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
 
           @ProcessElement
-          public void processElement(ProcessContext context, @TimerId(timerId) 
Timer timer) {
+          public void processElement(@TimerId(timerId) Timer timer) {
             timer.set(new Instant(0));
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {}
+          public void onTimer() {}
         };
 
     PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 
37))).apply(ParDo.of(fn));
@@ -2983,7 +3030,7 @@ public void processElement(
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {}
+          public void onTimer() {}
         };
 
     PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 
37))).apply(ParDo.of(fn));
@@ -3007,14 +3054,16 @@ public void testSimpleProcessingTimerTimer() throws 
Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
 
           @ProcessElement
-          public void processElement(ProcessContext context, @TimerId(timerId) 
Timer timer) {
+          public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
             timer.offset(Duration.standardSeconds(1)).setRelative();
-            context.output(3);
+            r.output(3);
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output(42);
+          public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+            if (timeDomain.equals(TimeDomain.PROCESSING_TIME)) {
+              r.output(42);
+            }
           }
         };
 
@@ -3041,14 +3090,14 @@ public void testEventTimeTimerUnbounded() throws 
Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @ProcessElement
-          public void processElement(ProcessContext context, @TimerId(timerId) 
Timer timer) {
+          public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
             timer.offset(Duration.standardSeconds(1)).setRelative();
-            context.output(3);
+            r.output(3);
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output(42);
+          public void onTimer(OutputReceiver<Integer> r) {
+            r.output(42);
           }
         };
 
@@ -3076,14 +3125,17 @@ public void testEventTimeTimerAlignUnbounded() throws 
Exception {
           private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @ProcessElement
-          public void processElement(ProcessContext context, @TimerId(timerId) 
Timer timer) {
+          public void processElement(@TimerId(timerId) Timer timer,
+                                     @Timestamp Instant timestamp,
+                                     OutputReceiver<KV<Integer, Instant>> r) {
             
timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
-            context.output(KV.of(3, context.timestamp()));
+            r.output(KV.of(3, timestamp));
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output(KV.of(42, context.timestamp()));
+          public void onTimer(@Timestamp Instant timestamp,
+                              OutputReceiver<KV<Integer, Instant>> r) {
+            r.output(KV.of(42, timestamp));
           }
         };
 
@@ -3119,8 +3171,9 @@ public void processElement(ProcessContext context, 
@TimerId(timerId) Timer timer
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output(KV.of(42, context.timestamp()));
+          public void onTimer(@Timestamp Instant timestamp,
+                              OutputReceiver<KV<Integer, Instant>> r) {
+            r.output(KV.of(42, timestamp));
           }
         };
 
@@ -3161,8 +3214,8 @@ public void processElement(ProcessContext context, 
@TimerId(timerId) Timer timer
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output("timer_output");
+          public void onTimer(OutputReceiver<String> r) {
+            r.output("timer_output");
           }
         };
 
@@ -3202,8 +3255,8 @@ public void processElement(ProcessContext context, 
@TimerId(timerId) Timer timer
           }
 
           @OnTimer(timerId)
-          public void onTimer(OnTimerContext context) {
-            context.output("timer_output");
+          public void onTimer(OutputReceiver<String> r) {
+            r.output("timer_output");
           }
         };
 
@@ -3309,8 +3362,8 @@ public void testPipelineOptionsParameter() {
             ParDo.of(
                 new DoFn<Integer, String>() {
                   @ProcessElement
-                  public void process(ProcessContext c, PipelineOptions 
options) {
-                    c.output(options.as(MyOptions.class).getFakeOption());
+                  public void process(OutputReceiver<String> r, 
PipelineOptions options) {
+                    r.output(options.as(MyOptions.class).getFakeOption());
                   }
                 }));
 
@@ -3342,8 +3395,8 @@ public void process(
                       }
 
                       @OnTimer(timerId)
-                      public void onTimer(OnTimerContext c, PipelineOptions 
options) {
-                        c.output(options.as(MyOptions.class).getFakeOption());
+                      public void onTimer(OutputReceiver<String> r, 
PipelineOptions options) {
+                        r.output(options.as(MyOptions.class).getFakeOption());
                       }
                     }));
 
@@ -3382,8 +3435,8 @@ public void process(ProcessContext c,
     }
 
     @OnTimer("timer")
-    public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
-      c.output("It works");
+    public void onTimer(OutputReceiver<String> r, @TimerId("timer") Timer 
timer) {
+      r.output("It works");
     }
   }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 2c1575ab64c..20ae58742c2 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -49,6 +49,8 @@
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import 
org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
@@ -77,13 +79,30 @@
   @Mock private DoFn<String, String>.StartBundleContext mockStartBundleContext;
   @Mock private DoFn<String, String>.FinishBundleContext 
mockFinishBundleContext;
   @Mock private DoFn<String, String>.ProcessContext mockProcessContext;
+  private String mockElement;
+  private Instant mockTimestamp;
+  @Mock private OutputReceiver<String> mockOutputReceiver;
+  @Mock private MultiOutputReceiver mockMultiOutputReceiver;
   @Mock private IntervalWindow mockWindow;
+ // @Mock private PaneInfo mockPaneInfo;
   @Mock private DoFnInvoker.ArgumentProvider<String, String> 
mockArgumentProvider;
 
   @Before
   public void setUp() {
+    mockElement =  new String("element");
+    mockTimestamp = new Instant(0);
     MockitoAnnotations.initMocks(this);
     when(mockArgumentProvider.window()).thenReturn(mockWindow);
+   // when(mockArgumentProvider.paneInfo(Matchers.<DoFn>any()))
+   //     .thenReturn(mockPaneInfo);
+    when(mockArgumentProvider.element(Matchers.<DoFn>any()))
+       .thenReturn(mockElement);
+    when(mockArgumentProvider.timestamp(Matchers.<DoFn>any()))
+        .thenReturn(mockTimestamp);
+    when(mockArgumentProvider.outputReceiver(Matchers.<DoFn>any()))
+        .thenReturn(mockOutputReceiver);
+    when(mockArgumentProvider.taggedOutputReceiver(Matchers.<DoFn>any()))
+        .thenReturn(mockMultiOutputReceiver);
     when(mockArgumentProvider.startBundleContext(Matchers.<DoFn>any()))
         .thenReturn(mockStartBundleContext);
     when(mockArgumentProvider.finishBundleContext(Matchers.<DoFn>any()))
@@ -187,6 +206,25 @@ public void processElement(ProcessContext c, 
IntervalWindow w) throws Exception
     verify(fn).processElement(mockProcessContext, mockWindow);
   }
 
+  @Test
+  public void testDoFnWithAllParameters() throws Exception {
+    class MockFn extends DoFn<String, String> {
+      @DoFn.ProcessElement
+      public void processElement(ProcessContext c,
+                                 @Element String element,
+                                 @Timestamp Instant timestamp,
+                                 IntervalWindow w,
+                         //        PaneInfo p,
+                                 OutputReceiver<String> receiver,
+                                 MultiOutputReceiver multiReceiver) throws 
Exception {}
+    }
+
+    MockFn fn = mock(MockFn.class);
+    assertEquals(stop(), invokeProcessElement(fn));
+    verify(fn).processElement(mockProcessContext, mockElement, mockTimestamp, 
mockWindow,
+        mockOutputReceiver, mockMultiOutputReceiver);
+  }
+
   /**
    * Tests that the generated {@link DoFnInvoker} passes the state parameter 
that it
    * should.
@@ -378,7 +416,18 @@ public void splitRestriction(
     assertEquals(coder, 
invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
     assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
     final List<SomeRestriction> outputs = new ArrayList<>();
-    invoker.invokeSplitRestriction("blah", restriction, outputs::add);
+    invoker.invokeSplitRestriction("blah", restriction,
+        new OutputReceiver<SomeRestriction>() {
+          @Override
+          public void output(SomeRestriction output) {
+            outputs.add(output);
+          }
+
+          @Override
+          public void outputWithTimestamp(SomeRestriction output, Instant 
timestamp) {
+            outputs.add(output);
+          }
+        });
     assertEquals(Arrays.asList(part1, part2, part3), outputs);
     assertEquals(tracker, invoker.invokeNewTracker(restriction));
     assertEquals(
@@ -474,6 +523,13 @@ public void output(String output) {
             invoked = true;
             assertEquals("foo", output);
           }
+
+          @Override
+          public void outputWithTimestamp(String output, Instant instant) {
+            assertFalse(invoked);
+            invoked = true;
+            assertEquals("foo", output);
+          }
         });
     assertEquals(stop(), invoker.invokeProcessElement(mockArgumentProvider));
     assertThat(
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 0377b446f0e..0d5f7f376e6 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -44,16 +44,25 @@
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PaneInfoParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PipelineOptionsParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TaggedOutputReceiverParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomainParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -67,7 +76,7 @@
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
-  public void testBasicDoFn() throws Exception {
+  public void testBasicDoFnProcessContext() throws Exception {
     DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() 
{
       @ProcessElement
       public void process(ProcessContext c) {}
@@ -78,6 +87,75 @@ public void process(ProcessContext c) {}
         sig.processElement().extraParameters().get(0), 
instanceOf(ProcessContextParameter.class));
   }
 
+  @Test
+  public void testBasicDoFnAllParameters() throws Exception {
+    DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() 
{
+      @ProcessElement
+      public void process(@Element String element, @Timestamp Instant 
timestamp,
+                          BoundedWindow window, PaneInfo paneInfo,
+                          OutputReceiver<String> receiver,
+                          PipelineOptions options) {}
+    }.getClass());
+
+    assertThat(sig.processElement().extraParameters().size(), equalTo(6));
+    assertThat(
+        sig.processElement().extraParameters().get(0), 
instanceOf(ElementParameter.class));
+    assertThat(
+        sig.processElement().extraParameters().get(1), 
instanceOf(TimestampParameter.class));
+    assertThat(
+        sig.processElement().extraParameters().get(2), 
instanceOf(WindowParameter.class));
+    assertThat(
+        sig.processElement().extraParameters().get(3), 
instanceOf(PaneInfoParameter.class));
+    assertThat(
+        sig.processElement().extraParameters().get(4), 
instanceOf(OutputReceiverParameter.class));
+    assertThat(
+        sig.processElement().extraParameters().get(5), 
instanceOf(PipelineOptionsParameter.class));
+  }
+
+  @Test
+  public void testBasicDoFnMultiOutputReceiver() throws Exception {
+    DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() 
{
+      @ProcessElement
+      public void process(MultiOutputReceiver receiver) {}
+    }.getClass());
+
+    assertThat(sig.processElement().extraParameters().size(), equalTo(1));
+    assertThat(
+        sig.processElement().extraParameters().get(0),
+        instanceOf(TaggedOutputReceiverParameter.class));
+  }
+
+
+  @Test
+  public void testWrongElementType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("@Element argument must have type java.lang.String");
+    DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() 
{
+      @ProcessElement
+      public void process(@Element Integer element) {}
+    }.getClass());
+  }
+
+  @Test
+  public void testWrongTimestampType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("@Timestamp argument must have type 
org.joda.time.Instant");
+    DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() 
{
+      @ProcessElement
+      public void process(@Timestamp String timestamp) {}
+    }.getClass());
+  }
+
+  @Test
+  public void testWrongOutputReceiverType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("OutputReceiver should be parameterized by 
java.lang.String");
+    DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() 
{
+      @ProcessElement
+      public void process(OutputReceiver<Integer> receiver) {}
+    }.getClass());
+  }
+
   @Test
   public void testRequiresStableInputProcessElement() throws Exception {
     DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() 
{
@@ -344,6 +422,35 @@ public void onTimer(BoundedWindow w) {}
         instanceOf(WindowParameter.class));
   }
 
+  @Test
+  public void testAllParamsOnTimer() throws Exception {
+    final String timerId = "some-timer-id";
+
+    DoFnSignature sig =
+        DoFnSignatures.getSignature(new DoFn<String, String>() {
+          @TimerId(timerId)
+          private final TimerSpec myfield1 = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void process(ProcessContext c) {}
+
+          @OnTimer(timerId)
+          public void onTimer(@Timestamp Instant timestamp, TimeDomain 
timeDomain,
+                              BoundedWindow w) {}
+        }.getClass());
+
+    assertThat(sig.onTimerMethods().get(timerId).extraParameters().size(), 
equalTo(3));
+    assertThat(
+        sig.onTimerMethods().get(timerId).extraParameters().get(0),
+        instanceOf(TimestampParameter.class));
+    assertThat(
+        sig.onTimerMethods().get(timerId).extraParameters().get(1),
+        instanceOf(TimeDomainParameter.class));
+    assertThat(
+        sig.onTimerMethods().get(timerId).extraParameters().get(2),
+        instanceOf(WindowParameter.class));
+  }
+
   @Test
   public void testPipelineOptionsParameter() throws Exception {
     DoFnSignature sig =
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index b899c16767a..59a23872316 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -79,8 +79,11 @@
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -491,6 +494,12 @@ public BoundedWindow window() {
           "Cannot access window outside of @ProcessElement and @OnTimer 
methods.");
     }
 
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access paneInfo outside of @ProcessElement methods.");
+    }
+
     @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
         DoFn<InputT, OutputT> doFn) {
@@ -510,6 +519,35 @@ public BoundedWindow window() {
           "Cannot access ProcessContext outside of @ProcessElement method.");
     }
 
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access element outside of @ProcessElement method.");
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access timestamp outside of @ProcessElement method.");
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access time domain outside of @ProcessTimer method.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access output receiver outside of @ProcessElement method."); 
   }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access output reveiver outside of @ProcessElement method.");
+    }
+
     @Override
     public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
@@ -551,6 +589,11 @@ public BoundedWindow window() {
       return currentWindow;
     }
 
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      return pane();
+    }
+
     @Override
     public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> 
doFn) {
       throw new UnsupportedOperationException(
@@ -568,6 +611,32 @@ public ProcessContext processContext(DoFn<InputT, OutputT> 
doFn) {
       return this;
     }
 
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      return element();
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      return timestamp();
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access time domain outside of @ProcessTimer method.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      return DoFnOutputReceivers.windowedReceiver(this, null);
+    }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      return DoFnOutputReceivers.windowedMultiReceiver(this);
+    }
+
     @Override
     public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("TODO: Add support for timers");
@@ -707,6 +776,12 @@ public BoundedWindow window() {
           "Cannot access window outside of @ProcessElement and @OnTimer 
methods.");
     }
 
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access paneInfo outside of @ProcessElement methods.");
+    }
+
     @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
         DoFn<InputT, OutputT> doFn) {
@@ -726,6 +801,35 @@ public BoundedWindow window() {
           "Cannot access ProcessContext outside of @ProcessElement method.");
     }
 
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access element outside of @ProcessElement method.");
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access timestamp outside of @ProcessElement method.");
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access time domain outside of @ProcessTimer method.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access output receiver outside of @ProcessElement method."); 
   }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access output reveiver outside of @ProcessElement method.");
+    }
+
     @Override
     public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 95084)
    Time Spent: 4h 10m  (was: 4h)

> New DoFn should allow injecting of all parameters in ProcessContext
> -------------------------------------------------------------------
>
>                 Key: BEAM-3979
>                 URL: https://issues.apache.org/jira/browse/BEAM-3979
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.4.0
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> This was intended in the past, but never completed. Ideally all primitive 
> parameters in ProcessContext should be injectable, and OutputReceiver 
> parameters can be used to collection output. So, we should be able to write a 
> DoFn as follows
> @ProcessElement
> public void process(@Element String word, OutputReceiver<String> receiver) {
>   receiver.output(word.toUpperCase());
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to