Repository: beam
Updated Branches:
  refs/heads/master 77b136603 -> 38208eaa4


Removes inputProvider() and outputReceiver()

Removes InputProvider itself too.
Does not remove OutputReceiver because it's used in @SplitRestriction
method.
Cleans up tests that looked at InputProvider/OutputReceiver parameters -
instead now they look at DoFn.ProcessContext parameter, and I improved
the formatting of parameter types too.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d787bdd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d787bdd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d787bdd

Branch: refs/heads/master
Commit: 7d787bddc80fa832576407c313b26d5436dfa393
Parents: 34b4a6d
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Jan 27 14:56:56 2017 -0800
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Fri Jan 27 15:00:19 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnAdapters.java  | 20 -----
 .../beam/runners/core/SimpleDoFnRunner.java     | 32 --------
 .../beam/runners/core/SplittableParDo.java      | 12 ---
 .../org/apache/beam/sdk/transforms/DoFn.java    |  6 --
 .../apache/beam/sdk/transforms/DoFnTester.java  | 12 ---
 .../reflect/ByteBuddyDoFnInvokerFactory.java    | 12 ---
 .../sdk/transforms/reflect/DoFnInvoker.java     | 18 -----
 .../sdk/transforms/reflect/DoFnSignature.java   | 56 -------------
 .../sdk/transforms/reflect/DoFnSignatures.java  | 82 +++++---------------
 .../beam/sdk/util/common/ReflectHelpers.java    | 16 ++--
 .../transforms/reflect/DoFnInvokersTest.java    | 26 -------
 .../DoFnSignaturesProcessElementTest.java       | 40 ++++------
 .../DoFnSignaturesSplittableDoFnTest.java       |  3 +-
 .../transforms/reflect/DoFnSignaturesTest.java  |  6 +-
 14 files changed, 51 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index 23aba58..dcd7969 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -204,16 +204,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for 
testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only 
for testing");
-    }
-
-    @Override
     public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
       throw new UnsupportedOperationException("This is a non-splittable DoFn");
     }
@@ -316,16 +306,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for 
testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only 
for testing");
-    }
-
-    @Override
     public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
       throw new UnsupportedOperationException("This is a non-splittable DoFn");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index df5f3f6..d54daf6 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -34,9 +34,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.InputProvider;
 import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
@@ -439,16 +437,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("InputProvider is for testing 
only.");
-    }
-
-    @Override
-    public OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("OutputReceiver is for testing 
only.");
-    }
-
-    @Override
     public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
       throw new UnsupportedOperationException(
           "Cannot access RestrictionTracker outside of @ProcessElement 
method.");
@@ -626,16 +614,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("InputProvider parameters are 
not supported.");
-    }
-
-    @Override
-    public OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("OutputReceiver parameters are 
not supported.");
-    }
-
-    @Override
     public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
       throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
     }
@@ -744,16 +722,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("InputProvider parameters are 
not supported.");
-    }
-
-    @Override
-    public OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("OutputReceiver parameters are 
not supported.");
-    }
-
-    @Override
     public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
       throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index f8d12ec..d1cbf8f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -672,18 +672,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       }
 
       @Override
-      public DoFn.InputProvider<InputT> inputProvider() {
-        // DoFnSignatures should have verified that this DoFn doesn't access 
extra context.
-        throw new IllegalStateException("Unexpected extra context access on a 
splittable DoFn");
-      }
-
-      @Override
-      public DoFn.OutputReceiver<OutputT> outputReceiver() {
-        // DoFnSignatures should have verified that this DoFn doesn't access 
extra context.
-        throw new IllegalStateException("Unexpected extra context access on a 
splittable DoFn");
-      }
-
-      @Override
       public TrackerT restrictionTracker() {
         return tracker;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 699403f..a161919 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -372,12 +372,6 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
   public interface OutputReceiver<T> {
     void output(T output);
   }
-
-  /** Provides a single value of the given type. */
-  public interface InputProvider<T> {
-    T get();
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index b2c3fd5..0d1f96d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -315,18 +315,6 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
             }
 
             @Override
-            public DoFn.InputProvider<InputT> inputProvider() {
-              throw new UnsupportedOperationException(
-                  "Not expected to access InputProvider from DoFnTester");
-            }
-
-            @Override
-            public DoFn.OutputReceiver<OutputT> outputReceiver() {
-              throw new UnsupportedOperationException(
-                  "Not expected to access OutputReceiver from DoFnTester");
-            }
-
-            @Override
             public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
               throw new UnsupportedOperationException(
                   "Not expected to access RestrictionTracker from a regular 
DoFn in DoFnTester");

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 01ddd86..46b21d6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -68,9 +68,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
@@ -572,16 +570,6 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
           }
 
           @Override
-          public StackManipulation dispatch(InputProviderParameter p) {
-            return 
simpleExtraContextParameter(INPUT_PROVIDER_PARAMETER_METHOD);
-          }
-
-          @Override
-          public StackManipulation dispatch(OutputReceiverParameter p) {
-            return 
simpleExtraContextParameter(OUTPUT_RECEIVER_PARAMETER_METHOD);
-          }
-
-          @Override
           public StackManipulation dispatch(RestrictionTrackerParameter p) {
             // DoFnInvoker.ArgumentProvider.restrictionTracker() returns a 
RestrictionTracker,
             // but the @ProcessElement method expects a concrete subtype of it.

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 354578e..5f61349 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -21,8 +21,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
-import org.apache.beam.sdk.transforms.DoFn.InputProvider;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFn.StartBundle;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
@@ -113,12 +111,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     /** Provide a {@link DoFn.OnTimerContext} to use with the given {@link 
DoFn}. */
     DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> 
doFn);
 
-    /** A placeholder for testing purposes. */
-    InputProvider<InputT> inputProvider();
-
-    /** A placeholder for testing purposes. */
-    OutputReceiver<OutputT> outputReceiver();
-
     /**
      * If this is a splittable {@link DoFn}, returns the {@link 
RestrictionTracker} associated with
      * the current {@link ProcessElement} call.
@@ -155,16 +147,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
-    public InputProvider<InputT> inputProvider() {
-      return null;
-    }
-
-    @Override
-    public OutputReceiver<OutputT> outputReceiver() {
-      return null;
-    }
-
-    @Override
     public State state(String stateId) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index f470782..007d8be 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -28,8 +28,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.InputProvider;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
@@ -190,10 +188,6 @@ public abstract class DoFnSignature {
         return cases.dispatch((WindowParameter) this);
       } else if (this instanceof RestrictionTrackerParameter) {
         return cases.dispatch((RestrictionTrackerParameter) this);
-      } else if (this instanceof InputProviderParameter) {
-        return cases.dispatch((InputProviderParameter) this);
-      } else if (this instanceof OutputReceiverParameter) {
-        return cases.dispatch((OutputReceiverParameter) this);
       } else if (this instanceof StateParameter) {
         return cases.dispatch((StateParameter) this);
       } else if (this instanceof TimerParameter) {
@@ -213,8 +207,6 @@ public abstract class DoFnSignature {
       ResultT dispatch(ProcessContextParameter p);
       ResultT dispatch(OnTimerContextParameter p);
       ResultT dispatch(WindowParameter p);
-      ResultT dispatch(InputProviderParameter p);
-      ResultT dispatch(OutputReceiverParameter p);
       ResultT dispatch(RestrictionTrackerParameter p);
       ResultT dispatch(StateParameter p);
       ResultT dispatch(TimerParameter p);
@@ -247,16 +239,6 @@ public abstract class DoFnSignature {
         }
 
         @Override
-        public ResultT dispatch(InputProviderParameter p) {
-          return dispatchDefault(p);
-        }
-
-        @Override
-        public ResultT dispatch(OutputReceiverParameter p) {
-          return dispatchDefault(p);
-        }
-
-        @Override
         public ResultT dispatch(RestrictionTrackerParameter p) {
           return dispatchDefault(p);
         }
@@ -280,10 +262,6 @@ public abstract class DoFnSignature {
           new AutoValue_DoFnSignature_Parameter_ProcessContextParameter();
     private static final OnTimerContextParameter ON_TIMER_CONTEXT_PARAMETER =
         new AutoValue_DoFnSignature_Parameter_OnTimerContextParameter();
-    private static final InputProviderParameter INPUT_PROVIDER_PARAMETER =
-        new AutoValue_DoFnSignature_Parameter_InputProviderParameter();
-    private static final OutputReceiverParameter OUTPUT_RECEIVER_PARAMETER =
-        new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter();
 
     /** Returns a {@link ContextParameter}. */
     public static ContextParameter context() {
@@ -306,20 +284,6 @@ public abstract class DoFnSignature {
     }
 
     /**
-     * Returns an {@link InputProviderParameter}.
-     */
-    public static InputProviderParameter inputProvider() {
-      return INPUT_PROVIDER_PARAMETER;
-    }
-
-    /**
-     * Returns an {@link OutputReceiverParameter}.
-     */
-    public static OutputReceiverParameter outputReceiver() {
-      return OUTPUT_RECEIVER_PARAMETER;
-    }
-
-    /**
      * Returns a {@link RestrictionTrackerParameter}.
      */
     public static RestrictionTrackerParameter 
restrictionTracker(TypeDescriptor<?> trackerT) {
@@ -378,26 +342,6 @@ public abstract class DoFnSignature {
     }
 
     /**
-     * Descriptor for a {@link Parameter} of type {@link InputProvider}.
-     *
-     * <p>All such descriptors are equal.
-     */
-    @AutoValue
-    public abstract static class InputProviderParameter extends Parameter {
-      InputProviderParameter() {}
-    }
-
-    /**
-     * Descriptor for a {@link Parameter} of type {@link OutputReceiver}.
-     *
-     * <p>All such descriptors are equal.
-     */
-    @AutoValue
-    public abstract static class OutputReceiverParameter extends Parameter {
-      OutputReceiverParameter() {}
-    }
-
-    /**
      * Descriptor for a {@link Parameter} of a subclass of {@link 
RestrictionTracker}.
      *
      * <p>All such descriptors are equal.

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index b6b764e..61b9157 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -80,9 +80,7 @@ public class DoFnSignatures {
           Parameter.ProcessContextParameter.class,
           Parameter.WindowParameter.class,
           Parameter.TimerParameter.class,
-          Parameter.StateParameter.class,
-          Parameter.InputProviderParameter.class,
-          Parameter.OutputReceiverParameter.class);
+          Parameter.StateParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
@@ -617,25 +615,6 @@ public class DoFnSignatures {
         .where(new TypeParameter<OutputT>() {}, outputT);
   }
 
-  /**
-   * Generates a {@link TypeDescriptor} for {@code DoFn.InputProvider<InputT>} 
given {@code InputT}.
-   */
-  private static <InputT> TypeDescriptor<DoFn.InputProvider<InputT>> 
inputProviderTypeOf(
-      TypeDescriptor<InputT> inputT) {
-    return new TypeDescriptor<DoFn.InputProvider<InputT>>() {}.where(
-        new TypeParameter<InputT>() {}, inputT);
-  }
-
-  /**
-   * Generates a {@link TypeDescriptor} for {@code 
DoFn.OutputReceiver<OutputT>} given {@code
-   * OutputT}.
-   */
-  private static <OutputT> TypeDescriptor<DoFn.OutputReceiver<OutputT>> 
outputReceiverTypeOf(
-      TypeDescriptor<OutputT> inputT) {
-    return new TypeDescriptor<DoFn.OutputReceiver<OutputT>>() {}.where(
-        new TypeParameter<OutputT>() {}, inputT);
-  }
-
   @VisibleForTesting
   static DoFnSignature.OnTimerMethod analyzeOnTimerMethod(
       ErrorReporter errors,
@@ -767,8 +746,6 @@ public class DoFnSignatures {
     TypeDescriptor<?> expectedProcessContextT = 
doFnProcessContextTypeOf(inputT, outputT);
     TypeDescriptor<?> expectedContextT = doFnContextTypeOf(inputT, outputT);
     TypeDescriptor<?> expectedOnTimerContextT = 
doFnOnTimerContextTypeOf(inputT, outputT);
-    TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT);
-    TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
 
     TypeDescriptor<?> paramT = param.getType();
     Class<?> rawType = paramT.getRawType();
@@ -776,51 +753,27 @@ public class DoFnSignatures {
     ErrorReporter paramErrors = methodErrors.forParameter(param);
 
     if (rawType.equals(DoFn.ProcessContext.class)) {
-      methodErrors.checkArgument(paramT.equals(expectedProcessContextT),
-        "Must take %s as the ProcessContext argument",
+      paramErrors.checkArgument(paramT.equals(expectedProcessContextT),
+        "ProcessContext argument must have type %s",
         formatType(expectedProcessContextT));
       return Parameter.processContext();
     } else if (rawType.equals(DoFn.Context.class)) {
-      methodErrors.checkArgument(paramT.equals(expectedContextT),
-          "Must take %s as the Context argument",
+      paramErrors.checkArgument(paramT.equals(expectedContextT),
+          "Context argument must have type %s",
           formatType(expectedContextT));
       return Parameter.context();
     } else if (rawType.equals(DoFn.OnTimerContext.class)) {
-        methodErrors.checkArgument(paramT.equals(expectedOnTimerContextT),
-            "Must take %s as the OnTimerContext argument",
-            formatType(expectedOnTimerContextT));
-        return Parameter.onTimerContext();
+      paramErrors.checkArgument(
+          paramT.equals(expectedOnTimerContextT),
+          "OnTimerContext argument must have type %s",
+          formatType(expectedOnTimerContextT));
+      return Parameter.onTimerContext();
     } else if (BoundedWindow.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
           !methodContext.hasWindowParameter(),
           "Multiple %s parameters",
           BoundedWindow.class.getSimpleName());
       return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) 
paramT);
-    } else if (rawType.equals(DoFn.InputProvider.class)) {
-      methodErrors.checkArgument(
-          
!methodContext.getExtraParameters().contains(Parameter.inputProvider()),
-          "Multiple %s parameters",
-          DoFn.InputProvider.class.getSimpleName());
-      paramErrors.checkArgument(
-          paramT.equals(expectedInputProviderT),
-          "%s is for %s when it should be %s",
-          DoFn.InputProvider.class.getSimpleName(),
-          formatType(paramT),
-          formatType(expectedInputProviderT));
-      return Parameter.inputProvider();
-    } else if (rawType.equals(DoFn.OutputReceiver.class)) {
-      methodErrors.checkArgument(
-          
!methodContext.getExtraParameters().contains(Parameter.outputReceiver()),
-          "Multiple %s parameters",
-          DoFn.OutputReceiver.class.getSimpleName());
-      paramErrors.checkArgument(
-          paramT.equals(expectedOutputReceiverT),
-          "%s is for %s when it should be %s",
-          DoFn.OutputReceiver.class.getSimpleName(),
-          formatType(paramT),
-          formatType(expectedOutputReceiverT));
-      return Parameter.outputReceiver();
-
     } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
           !methodContext.hasRestrictionTrackerParameter(),
@@ -890,7 +843,7 @@ public class DoFnSignatures {
           "reference to %s %s with different type %s",
           StateId.class.getSimpleName(),
           id,
-          stateDecl.stateType());
+          formatType(stateDecl.stateType()));
 
       paramErrors.checkArgument(
           
stateDecl.field().getDeclaringClass().equals(param.getMethod().getDeclaringClass()),
@@ -1001,9 +954,14 @@ public class DoFnSignatures {
         m, fnT.resolveType(m.getGenericReturnType()));
   }
 
-  /** Generates a {@link TypeDescriptor} for {@code List<T>} given {@code T}. 
*/
-  private static <T> TypeDescriptor<List<T>> listTypeOf(TypeDescriptor<T> 
elementT) {
-    return new TypeDescriptor<List<T>>() {}.where(new TypeParameter<T>() {}, 
elementT);
+  /**
+   * Generates a {@link TypeDescriptor} for {@code 
DoFn.OutputReceiver<OutputT>} given {@code
+   * OutputT}.
+   */
+  private static <OutputT> TypeDescriptor<DoFn.OutputReceiver<OutputT>> 
outputReceiverTypeOf(
+      TypeDescriptor<OutputT> inputT) {
+    return new TypeDescriptor<DoFn.OutputReceiver<OutputT>>() {}.where(
+        new TypeParameter<OutputT>() {}, inputT);
   }
 
   @VisibleForTesting
@@ -1317,7 +1275,7 @@ public class DoFnSignatures {
           this,
           String.format(
               "parameter of type %s at index %s",
-              param.getType(), param.getIndex()));
+              formatType(param.getType()), param.getIndex()));
     }
 
     void throwIllegalArgument(String message, Object... args) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 4ec39c1..7752b2a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -155,12 +155,18 @@ public class ReflectHelpers {
     }
 
     private void formatParameterizedType(StringBuilder builder, 
ParameterizedType t) {
+      if (t.getOwnerType() != null) {
+        format(builder, t.getOwnerType());
+        builder.append('.');
+      }
       format(builder, t.getRawType());
-      builder.append('<');
-      COMMA_SEPARATOR.appendTo(builder,
-          FluentIterable.from(asList(t.getActualTypeArguments()))
-          .transform(TYPE_SIMPLE_DESCRIPTION));
-      builder.append('>');
+      if (t.getActualTypeArguments().length > 0) {
+        builder.append('<');
+        COMMA_SEPARATOR.appendTo(builder,
+            FluentIterable.from(asList(t.getActualTypeArguments()))
+                .transform(TYPE_SIMPLE_DESCRIPTION));
+        builder.append('>');
+      }
     }
 
     private void formatGenericArrayType(StringBuilder builder, 
GenericArrayType t) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 4c6bee1..9bc2d12 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -73,16 +73,12 @@ public class DoFnInvokersTest {
 
   @Mock private DoFn<String, String>.ProcessContext mockProcessContext;
   @Mock private IntervalWindow mockWindow;
-  @Mock private DoFn.InputProvider<String> mockInputProvider;
-  @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
   @Mock private DoFnInvoker.ArgumentProvider<String, String> 
mockArgumentProvider;
 
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
     when(mockArgumentProvider.window()).thenReturn(mockWindow);
-    when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider);
-    when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver);
     
when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
   }
 
@@ -231,28 +227,6 @@ public class DoFnInvokersTest {
   }
 
   @Test
-  public void testDoFnWithOutputReceiver() throws Exception {
-    class MockFn extends DoFn<String, String> {
-      @DoFn.ProcessElement
-      public void processElement(ProcessContext c, OutputReceiver<String> o) 
throws Exception {}
-    }
-    MockFn fn = mock(MockFn.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
-    verify(fn).processElement(mockProcessContext, mockOutputReceiver);
-  }
-
-  @Test
-  public void testDoFnWithInputProvider() throws Exception {
-    class MockFn extends DoFn<String, String> {
-      @DoFn.ProcessElement
-      public void processElement(ProcessContext c, InputProvider<String> o) 
throws Exception {}
-    }
-    MockFn fn = mock(MockFn.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
-    verify(fn).processElement(mockProcessContext, mockInputProvider);
-  }
-
-  @Test
   public void testDoFnWithReturn() throws Exception {
     class MockFn extends DoFn<String, String> {
       @DoFn.ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index 5255af8..44ae5c4 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -65,41 +65,35 @@ public class DoFnSignaturesProcessElementTest {
     analyzeProcessElementMethod(
         new AnonymousMethod() {
           private void method(
-              DoFn<Integer, String>.ProcessContext c,
-              DoFn.InputProvider<Integer> input,
-              DoFn.OutputReceiver<String> output) {}
+              DoFn<Integer, String>.ProcessContext c) {}
         });
   }
 
   @Test
   public void testBadGenericsTwoArgs() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("OutputReceiver<Integer>");
-    thrown.expectMessage("should be");
-    thrown.expectMessage("OutputReceiver<String>");
+    thrown.expectMessage("DoFn<Integer, Integer>.ProcessContext");
+    thrown.expectMessage("must have type");
+    thrown.expectMessage("DoFn<Integer, String>.ProcessContext");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {
           private void method(
-              DoFn<Integer, String>.ProcessContext c,
-              DoFn.InputProvider<Integer> input,
-              DoFn.OutputReceiver<Integer> output) {}
+              DoFn<Integer, Integer>.ProcessContext c) {}
         });
   }
 
   @Test
   public void testBadGenericWildCards() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("OutputReceiver<? super Integer>");
-    thrown.expectMessage("should be");
-    thrown.expectMessage("OutputReceiver<String>");
+    thrown.expectMessage("DoFn<Integer, ? super Integer>.ProcessContext");
+    thrown.expectMessage("must have type");
+    thrown.expectMessage("DoFn<Integer, String>.ProcessContext");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {
           private void method(
-              DoFn<Integer, String>.ProcessContext c,
-              DoFn.InputProvider<Integer> input,
-              DoFn.OutputReceiver<? super Integer> output) {}
+              DoFn<Integer, ? super Integer>.ProcessContext c) {}
         });
   }
 
@@ -107,17 +101,15 @@ public class DoFnSignaturesProcessElementTest {
     @ProcessElement
     @SuppressWarnings("unused")
     public void badTypeVariables(
-        DoFn<InputT, OutputT>.ProcessContext c,
-        DoFn.InputProvider<InputT> input,
-        DoFn.OutputReceiver<InputT> output) {}
+        DoFn<InputT, InputT>.ProcessContext c) {}
   }
 
   @Test
   public void testBadTypeVariables() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("OutputReceiver<InputT>");
-    thrown.expectMessage("should be");
-    thrown.expectMessage("OutputReceiver<OutputT>");
+    thrown.expectMessage("DoFn<InputT, InputT>.ProcessContext");
+    thrown.expectMessage("must have type");
+    thrown.expectMessage("DoFn<InputT, OutputT>.ProcessContext");
 
     DoFnSignatures.getSignature(BadTypeVariables.class);
   }
@@ -164,9 +156,7 @@ public class DoFnSignaturesProcessElementTest {
     @ProcessElement
     @SuppressWarnings("unused")
     public void goodTypeVariables(
-        DoFn<InputT, OutputT>.ProcessContext c,
-        DoFn.InputProvider<InputT> input,
-        DoFn.OutputReceiver<OutputT> output) {}
+        DoFn<InputT, OutputT>.ProcessContext c) {}
   }
 
   @Test
@@ -177,7 +167,7 @@ public class DoFnSignaturesProcessElementTest {
   private static class IdentityFn<T> extends DoFn<T, T> {
     @ProcessElement
     @SuppressWarnings("unused")
-    public void processElement(ProcessContext c, InputProvider<T> input, 
OutputReceiver<T> output) {
+    public void processElement(ProcessContext c) {
       c.output(c.element());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 7b594c9..c10d199 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -403,7 +403,8 @@ public class DoFnSignaturesSplittableDoFnTest {
   @Test
   public void testSplitRestrictionReturnsWrongType() throws Exception {
     thrown.expectMessage(
-        "Third argument must be OutputReceiver<SomeRestriction>, but is 
OutputReceiver<String>");
+        "Third argument must be DoFn.OutputReceiver<SomeRestriction>, "
+            + "but is DoFn.OutputReceiver<String>");
     DoFnSignatures.analyzeSplitRestrictionMethod(
         errors(),
         TypeDescriptor.of(FakeDoFn.class),

http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 69d4058..e1fa2d1 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -76,7 +76,7 @@ public class DoFnSignaturesTest {
   @Test
   public void testBadExtraContext() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Must take a single argument of type Context");
+    thrown.expectMessage("Must take a single argument of type DoFn<Integer, 
String>.Context");
 
     DoFnSignatures.analyzeBundleMethod(
         errors(),
@@ -656,10 +656,10 @@ public class DoFnSignaturesTest {
   @Test
   public void testStateParameterWrongGenericType() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("ValueState<java.lang.String>");
+    thrown.expectMessage("ValueState<String>");
     thrown.expectMessage("reference to");
     thrown.expectMessage("different type");
-    thrown.expectMessage("ValueState<java.lang.Integer>");
+    thrown.expectMessage("ValueState<Integer>");
     thrown.expectMessage("my-id");
     thrown.expectMessage("myProcessElement");
     thrown.expectMessage("index 1");

Reply via email to