Repository: beam Updated Branches: refs/heads/master 77b136603 -> 38208eaa4
Removes inputProvider() and outputReceiver() Removes InputProvider itself too. Does not remove OutputReceiver because it's used in @SplitRestriction method. Cleans up tests that looked at InputProvider/OutputReceiver parameters - instead now they look at DoFn.ProcessContext parameter, and I improved the formatting of parameter types too. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d787bdd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d787bdd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d787bdd Branch: refs/heads/master Commit: 7d787bddc80fa832576407c313b26d5436dfa393 Parents: 34b4a6d Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Jan 27 14:56:56 2017 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Fri Jan 27 15:00:19 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/runners/core/DoFnAdapters.java | 20 ----- .../beam/runners/core/SimpleDoFnRunner.java | 32 -------- .../beam/runners/core/SplittableParDo.java | 12 --- .../org/apache/beam/sdk/transforms/DoFn.java | 6 -- .../apache/beam/sdk/transforms/DoFnTester.java | 12 --- .../reflect/ByteBuddyDoFnInvokerFactory.java | 12 --- .../sdk/transforms/reflect/DoFnInvoker.java | 18 ----- .../sdk/transforms/reflect/DoFnSignature.java | 56 ------------- .../sdk/transforms/reflect/DoFnSignatures.java | 82 +++++--------------- .../beam/sdk/util/common/ReflectHelpers.java | 16 ++-- .../transforms/reflect/DoFnInvokersTest.java | 26 ------- .../DoFnSignaturesProcessElementTest.java | 40 ++++------ .../DoFnSignaturesSplittableDoFnTest.java | 3 +- .../transforms/reflect/DoFnSignaturesTest.java | 6 +- 14 files changed, 51 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index 23aba58..dcd7969 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -204,16 +204,6 @@ public class DoFnAdapters { } @Override - public DoFn.InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException("inputProvider() exists only for testing"); - } - - @Override - public DoFn.OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException("outputReceiver() exists only for testing"); - } - - @Override public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException("This is a non-splittable DoFn"); } @@ -316,16 +306,6 @@ public class DoFnAdapters { } @Override - public DoFn.InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException("inputProvider() exists only for testing"); - } - - @Override - public DoFn.OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException("outputReceiver() exists only for testing"); - } - - @Override public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException("This is a non-splittable DoFn"); } http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index df5f3f6..d54daf6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -34,9 +34,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; -import org.apache.beam.sdk.transforms.DoFn.InputProvider; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; -import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; @@ -439,16 +437,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException("InputProvider is for testing only."); - } - - @Override - public OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException("OutputReceiver is for testing only."); - } - - @Override public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException( "Cannot access RestrictionTracker outside of @ProcessElement method."); @@ -626,16 +614,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException("InputProvider parameters are not supported."); - } - - @Override - public OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException("OutputReceiver parameters are not supported."); - } - - @Override public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); } @@ -744,16 +722,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException("InputProvider parameters are not supported."); - } - - @Override - public OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException("OutputReceiver parameters are not supported."); - } - - @Override public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); } http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index f8d12ec..d1cbf8f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -672,18 +672,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> } @Override - public DoFn.InputProvider<InputT> inputProvider() { - // DoFnSignatures should have verified that this DoFn doesn't access extra context. - throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); - } - - @Override - public DoFn.OutputReceiver<OutputT> outputReceiver() { - // DoFnSignatures should have verified that this DoFn doesn't access extra context. - throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); - } - - @Override public TrackerT restrictionTracker() { return tracker; } http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 699403f..a161919 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -372,12 +372,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD public interface OutputReceiver<T> { void output(T output); } - - /** Provides a single value of the given type. */ - public interface InputProvider<T> { - T get(); - } - ///////////////////////////////////////////////////////////////////////////// /** http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index b2c3fd5..0d1f96d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -315,18 +315,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override - public DoFn.InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException( - "Not expected to access InputProvider from DoFnTester"); - } - - @Override - public DoFn.OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException( - "Not expected to access OutputReceiver from DoFnTester"); - } - - @Override public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException( "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester"); http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 01ddd86..46b21d6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -68,9 +68,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; @@ -572,16 +570,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { } @Override - public StackManipulation dispatch(InputProviderParameter p) { - return simpleExtraContextParameter(INPUT_PROVIDER_PARAMETER_METHOD); - } - - @Override - public StackManipulation dispatch(OutputReceiverParameter p) { - return simpleExtraContextParameter(OUTPUT_RECEIVER_PARAMETER_METHOD); - } - - @Override public StackManipulation dispatch(RestrictionTrackerParameter p) { // DoFnInvoker.ArgumentProvider.restrictionTracker() returns a RestrictionTracker, // but the @ProcessElement method expects a concrete subtype of it. http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 354578e..5f61349 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -21,8 +21,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.FinishBundle; -import org.apache.beam.sdk.transforms.DoFn.InputProvider; -import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.DoFn.StartBundle; import org.apache.beam.sdk.transforms.DoFn.StateId; @@ -113,12 +111,6 @@ public interface DoFnInvoker<InputT, OutputT> { /** Provide a {@link DoFn.OnTimerContext} to use with the given {@link DoFn}. */ DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn); - /** A placeholder for testing purposes. */ - InputProvider<InputT> inputProvider(); - - /** A placeholder for testing purposes. */ - OutputReceiver<OutputT> outputReceiver(); - /** * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with * the current {@link ProcessElement} call. @@ -155,16 +147,6 @@ public interface DoFnInvoker<InputT, OutputT> { } @Override - public InputProvider<InputT> inputProvider() { - return null; - } - - @Override - public OutputReceiver<OutputT> outputReceiver() { - return null; - } - - @Override public State state(String stateId) { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index f470782..007d8be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -28,8 +28,6 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.InputProvider; -import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; @@ -190,10 +188,6 @@ public abstract class DoFnSignature { return cases.dispatch((WindowParameter) this); } else if (this instanceof RestrictionTrackerParameter) { return cases.dispatch((RestrictionTrackerParameter) this); - } else if (this instanceof InputProviderParameter) { - return cases.dispatch((InputProviderParameter) this); - } else if (this instanceof OutputReceiverParameter) { - return cases.dispatch((OutputReceiverParameter) this); } else if (this instanceof StateParameter) { return cases.dispatch((StateParameter) this); } else if (this instanceof TimerParameter) { @@ -213,8 +207,6 @@ public abstract class DoFnSignature { ResultT dispatch(ProcessContextParameter p); ResultT dispatch(OnTimerContextParameter p); ResultT dispatch(WindowParameter p); - ResultT dispatch(InputProviderParameter p); - ResultT dispatch(OutputReceiverParameter p); ResultT dispatch(RestrictionTrackerParameter p); ResultT dispatch(StateParameter p); ResultT dispatch(TimerParameter p); @@ -247,16 +239,6 @@ public abstract class DoFnSignature { } @Override - public ResultT dispatch(InputProviderParameter p) { - return dispatchDefault(p); - } - - @Override - public ResultT dispatch(OutputReceiverParameter p) { - return dispatchDefault(p); - } - - @Override public ResultT dispatch(RestrictionTrackerParameter p) { return dispatchDefault(p); } @@ -280,10 +262,6 @@ public abstract class DoFnSignature { new AutoValue_DoFnSignature_Parameter_ProcessContextParameter(); private static final OnTimerContextParameter ON_TIMER_CONTEXT_PARAMETER = new AutoValue_DoFnSignature_Parameter_OnTimerContextParameter(); - private static final InputProviderParameter INPUT_PROVIDER_PARAMETER = - new AutoValue_DoFnSignature_Parameter_InputProviderParameter(); - private static final OutputReceiverParameter OUTPUT_RECEIVER_PARAMETER = - new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter(); /** Returns a {@link ContextParameter}. */ public static ContextParameter context() { @@ -306,20 +284,6 @@ public abstract class DoFnSignature { } /** - * Returns an {@link InputProviderParameter}. - */ - public static InputProviderParameter inputProvider() { - return INPUT_PROVIDER_PARAMETER; - } - - /** - * Returns an {@link OutputReceiverParameter}. - */ - public static OutputReceiverParameter outputReceiver() { - return OUTPUT_RECEIVER_PARAMETER; - } - - /** * Returns a {@link RestrictionTrackerParameter}. */ public static RestrictionTrackerParameter restrictionTracker(TypeDescriptor<?> trackerT) { @@ -378,26 +342,6 @@ public abstract class DoFnSignature { } /** - * Descriptor for a {@link Parameter} of type {@link InputProvider}. - * - * <p>All such descriptors are equal. - */ - @AutoValue - public abstract static class InputProviderParameter extends Parameter { - InputProviderParameter() {} - } - - /** - * Descriptor for a {@link Parameter} of type {@link OutputReceiver}. - * - * <p>All such descriptors are equal. - */ - @AutoValue - public abstract static class OutputReceiverParameter extends Parameter { - OutputReceiverParameter() {} - } - - /** * Descriptor for a {@link Parameter} of a subclass of {@link RestrictionTracker}. * * <p>All such descriptors are equal. http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index b6b764e..61b9157 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -80,9 +80,7 @@ public class DoFnSignatures { Parameter.ProcessContextParameter.class, Parameter.WindowParameter.class, Parameter.TimerParameter.class, - Parameter.StateParameter.class, - Parameter.InputProviderParameter.class, - Parameter.OutputReceiverParameter.class); + Parameter.StateParameter.class); private static final Collection<Class<? extends Parameter>> ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS = @@ -617,25 +615,6 @@ public class DoFnSignatures { .where(new TypeParameter<OutputT>() {}, outputT); } - /** - * Generates a {@link TypeDescriptor} for {@code DoFn.InputProvider<InputT>} given {@code InputT}. - */ - private static <InputT> TypeDescriptor<DoFn.InputProvider<InputT>> inputProviderTypeOf( - TypeDescriptor<InputT> inputT) { - return new TypeDescriptor<DoFn.InputProvider<InputT>>() {}.where( - new TypeParameter<InputT>() {}, inputT); - } - - /** - * Generates a {@link TypeDescriptor} for {@code DoFn.OutputReceiver<OutputT>} given {@code - * OutputT}. - */ - private static <OutputT> TypeDescriptor<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf( - TypeDescriptor<OutputT> inputT) { - return new TypeDescriptor<DoFn.OutputReceiver<OutputT>>() {}.where( - new TypeParameter<OutputT>() {}, inputT); - } - @VisibleForTesting static DoFnSignature.OnTimerMethod analyzeOnTimerMethod( ErrorReporter errors, @@ -767,8 +746,6 @@ public class DoFnSignatures { TypeDescriptor<?> expectedProcessContextT = doFnProcessContextTypeOf(inputT, outputT); TypeDescriptor<?> expectedContextT = doFnContextTypeOf(inputT, outputT); TypeDescriptor<?> expectedOnTimerContextT = doFnOnTimerContextTypeOf(inputT, outputT); - TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT); - TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT); TypeDescriptor<?> paramT = param.getType(); Class<?> rawType = paramT.getRawType(); @@ -776,51 +753,27 @@ public class DoFnSignatures { ErrorReporter paramErrors = methodErrors.forParameter(param); if (rawType.equals(DoFn.ProcessContext.class)) { - methodErrors.checkArgument(paramT.equals(expectedProcessContextT), - "Must take %s as the ProcessContext argument", + paramErrors.checkArgument(paramT.equals(expectedProcessContextT), + "ProcessContext argument must have type %s", formatType(expectedProcessContextT)); return Parameter.processContext(); } else if (rawType.equals(DoFn.Context.class)) { - methodErrors.checkArgument(paramT.equals(expectedContextT), - "Must take %s as the Context argument", + paramErrors.checkArgument(paramT.equals(expectedContextT), + "Context argument must have type %s", formatType(expectedContextT)); return Parameter.context(); } else if (rawType.equals(DoFn.OnTimerContext.class)) { - methodErrors.checkArgument(paramT.equals(expectedOnTimerContextT), - "Must take %s as the OnTimerContext argument", - formatType(expectedOnTimerContextT)); - return Parameter.onTimerContext(); + paramErrors.checkArgument( + paramT.equals(expectedOnTimerContextT), + "OnTimerContext argument must have type %s", + formatType(expectedOnTimerContextT)); + return Parameter.onTimerContext(); } else if (BoundedWindow.class.isAssignableFrom(rawType)) { methodErrors.checkArgument( !methodContext.hasWindowParameter(), "Multiple %s parameters", BoundedWindow.class.getSimpleName()); return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT); - } else if (rawType.equals(DoFn.InputProvider.class)) { - methodErrors.checkArgument( - !methodContext.getExtraParameters().contains(Parameter.inputProvider()), - "Multiple %s parameters", - DoFn.InputProvider.class.getSimpleName()); - paramErrors.checkArgument( - paramT.equals(expectedInputProviderT), - "%s is for %s when it should be %s", - DoFn.InputProvider.class.getSimpleName(), - formatType(paramT), - formatType(expectedInputProviderT)); - return Parameter.inputProvider(); - } else if (rawType.equals(DoFn.OutputReceiver.class)) { - methodErrors.checkArgument( - !methodContext.getExtraParameters().contains(Parameter.outputReceiver()), - "Multiple %s parameters", - DoFn.OutputReceiver.class.getSimpleName()); - paramErrors.checkArgument( - paramT.equals(expectedOutputReceiverT), - "%s is for %s when it should be %s", - DoFn.OutputReceiver.class.getSimpleName(), - formatType(paramT), - formatType(expectedOutputReceiverT)); - return Parameter.outputReceiver(); - } else if (RestrictionTracker.class.isAssignableFrom(rawType)) { methodErrors.checkArgument( !methodContext.hasRestrictionTrackerParameter(), @@ -890,7 +843,7 @@ public class DoFnSignatures { "reference to %s %s with different type %s", StateId.class.getSimpleName(), id, - stateDecl.stateType()); + formatType(stateDecl.stateType())); paramErrors.checkArgument( stateDecl.field().getDeclaringClass().equals(param.getMethod().getDeclaringClass()), @@ -1001,9 +954,14 @@ public class DoFnSignatures { m, fnT.resolveType(m.getGenericReturnType())); } - /** Generates a {@link TypeDescriptor} for {@code List<T>} given {@code T}. */ - private static <T> TypeDescriptor<List<T>> listTypeOf(TypeDescriptor<T> elementT) { - return new TypeDescriptor<List<T>>() {}.where(new TypeParameter<T>() {}, elementT); + /** + * Generates a {@link TypeDescriptor} for {@code DoFn.OutputReceiver<OutputT>} given {@code + * OutputT}. + */ + private static <OutputT> TypeDescriptor<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf( + TypeDescriptor<OutputT> inputT) { + return new TypeDescriptor<DoFn.OutputReceiver<OutputT>>() {}.where( + new TypeParameter<OutputT>() {}, inputT); } @VisibleForTesting @@ -1317,7 +1275,7 @@ public class DoFnSignatures { this, String.format( "parameter of type %s at index %s", - param.getType(), param.getIndex())); + formatType(param.getType()), param.getIndex())); } void throwIllegalArgument(String message, Object... args) { http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java index 4ec39c1..7752b2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java @@ -155,12 +155,18 @@ public class ReflectHelpers { } private void formatParameterizedType(StringBuilder builder, ParameterizedType t) { + if (t.getOwnerType() != null) { + format(builder, t.getOwnerType()); + builder.append('.'); + } format(builder, t.getRawType()); - builder.append('<'); - COMMA_SEPARATOR.appendTo(builder, - FluentIterable.from(asList(t.getActualTypeArguments())) - .transform(TYPE_SIMPLE_DESCRIPTION)); - builder.append('>'); + if (t.getActualTypeArguments().length > 0) { + builder.append('<'); + COMMA_SEPARATOR.appendTo(builder, + FluentIterable.from(asList(t.getActualTypeArguments())) + .transform(TYPE_SIMPLE_DESCRIPTION)); + builder.append('>'); + } } private void formatGenericArrayType(StringBuilder builder, GenericArrayType t) { http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 4c6bee1..9bc2d12 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -73,16 +73,12 @@ public class DoFnInvokersTest { @Mock private DoFn<String, String>.ProcessContext mockProcessContext; @Mock private IntervalWindow mockWindow; - @Mock private DoFn.InputProvider<String> mockInputProvider; - @Mock private DoFn.OutputReceiver<String> mockOutputReceiver; @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider; @Before public void setUp() { MockitoAnnotations.initMocks(this); when(mockArgumentProvider.window()).thenReturn(mockWindow); - when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider); - when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver); when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext); } @@ -231,28 +227,6 @@ public class DoFnInvokersTest { } @Test - public void testDoFnWithOutputReceiver() throws Exception { - class MockFn extends DoFn<String, String> { - @DoFn.ProcessElement - public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {} - } - MockFn fn = mock(MockFn.class); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); - verify(fn).processElement(mockProcessContext, mockOutputReceiver); - } - - @Test - public void testDoFnWithInputProvider() throws Exception { - class MockFn extends DoFn<String, String> { - @DoFn.ProcessElement - public void processElement(ProcessContext c, InputProvider<String> o) throws Exception {} - } - MockFn fn = mock(MockFn.class); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); - verify(fn).processElement(mockProcessContext, mockInputProvider); - } - - @Test public void testDoFnWithReturn() throws Exception { class MockFn extends DoFn<String, String> { @DoFn.ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java index 5255af8..44ae5c4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java @@ -65,41 +65,35 @@ public class DoFnSignaturesProcessElementTest { analyzeProcessElementMethod( new AnonymousMethod() { private void method( - DoFn<Integer, String>.ProcessContext c, - DoFn.InputProvider<Integer> input, - DoFn.OutputReceiver<String> output) {} + DoFn<Integer, String>.ProcessContext c) {} }); } @Test public void testBadGenericsTwoArgs() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("OutputReceiver<Integer>"); - thrown.expectMessage("should be"); - thrown.expectMessage("OutputReceiver<String>"); + thrown.expectMessage("DoFn<Integer, Integer>.ProcessContext"); + thrown.expectMessage("must have type"); + thrown.expectMessage("DoFn<Integer, String>.ProcessContext"); analyzeProcessElementMethod( new AnonymousMethod() { private void method( - DoFn<Integer, String>.ProcessContext c, - DoFn.InputProvider<Integer> input, - DoFn.OutputReceiver<Integer> output) {} + DoFn<Integer, Integer>.ProcessContext c) {} }); } @Test public void testBadGenericWildCards() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("OutputReceiver<? super Integer>"); - thrown.expectMessage("should be"); - thrown.expectMessage("OutputReceiver<String>"); + thrown.expectMessage("DoFn<Integer, ? super Integer>.ProcessContext"); + thrown.expectMessage("must have type"); + thrown.expectMessage("DoFn<Integer, String>.ProcessContext"); analyzeProcessElementMethod( new AnonymousMethod() { private void method( - DoFn<Integer, String>.ProcessContext c, - DoFn.InputProvider<Integer> input, - DoFn.OutputReceiver<? super Integer> output) {} + DoFn<Integer, ? super Integer>.ProcessContext c) {} }); } @@ -107,17 +101,15 @@ public class DoFnSignaturesProcessElementTest { @ProcessElement @SuppressWarnings("unused") public void badTypeVariables( - DoFn<InputT, OutputT>.ProcessContext c, - DoFn.InputProvider<InputT> input, - DoFn.OutputReceiver<InputT> output) {} + DoFn<InputT, InputT>.ProcessContext c) {} } @Test public void testBadTypeVariables() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("OutputReceiver<InputT>"); - thrown.expectMessage("should be"); - thrown.expectMessage("OutputReceiver<OutputT>"); + thrown.expectMessage("DoFn<InputT, InputT>.ProcessContext"); + thrown.expectMessage("must have type"); + thrown.expectMessage("DoFn<InputT, OutputT>.ProcessContext"); DoFnSignatures.getSignature(BadTypeVariables.class); } @@ -164,9 +156,7 @@ public class DoFnSignaturesProcessElementTest { @ProcessElement @SuppressWarnings("unused") public void goodTypeVariables( - DoFn<InputT, OutputT>.ProcessContext c, - DoFn.InputProvider<InputT> input, - DoFn.OutputReceiver<OutputT> output) {} + DoFn<InputT, OutputT>.ProcessContext c) {} } @Test @@ -177,7 +167,7 @@ public class DoFnSignaturesProcessElementTest { private static class IdentityFn<T> extends DoFn<T, T> { @ProcessElement @SuppressWarnings("unused") - public void processElement(ProcessContext c, InputProvider<T> input, OutputReceiver<T> output) { + public void processElement(ProcessContext c) { c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index 7b594c9..c10d199 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -403,7 +403,8 @@ public class DoFnSignaturesSplittableDoFnTest { @Test public void testSplitRestrictionReturnsWrongType() throws Exception { thrown.expectMessage( - "Third argument must be OutputReceiver<SomeRestriction>, but is OutputReceiver<String>"); + "Third argument must be DoFn.OutputReceiver<SomeRestriction>, " + + "but is DoFn.OutputReceiver<String>"); DoFnSignatures.analyzeSplitRestrictionMethod( errors(), TypeDescriptor.of(FakeDoFn.class), http://git-wip-us.apache.org/repos/asf/beam/blob/7d787bdd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index 69d4058..e1fa2d1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -76,7 +76,7 @@ public class DoFnSignaturesTest { @Test public void testBadExtraContext() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Must take a single argument of type Context"); + thrown.expectMessage("Must take a single argument of type DoFn<Integer, String>.Context"); DoFnSignatures.analyzeBundleMethod( errors(), @@ -656,10 +656,10 @@ public class DoFnSignaturesTest { @Test public void testStateParameterWrongGenericType() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("ValueState<java.lang.String>"); + thrown.expectMessage("ValueState<String>"); thrown.expectMessage("reference to"); thrown.expectMessage("different type"); - thrown.expectMessage("ValueState<java.lang.Integer>"); + thrown.expectMessage("ValueState<Integer>"); thrown.expectMessage("my-id"); thrown.expectMessage("myProcessElement"); thrown.expectMessage("index 1");