Switch DoFnSignature, etc, from TypeToken to TypeDescriptor
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8336b24c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8336b24c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8336b24c Branch: refs/heads/apex-runner Commit: 8336b24c97c620fa3edb02301299080bda96379a Parents: d936ed8 Author: Kenneth Knowles <k...@google.com> Authored: Tue Nov 1 14:48:54 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Nov 3 21:32:53 2016 -0700 ---------------------------------------------------------------------- .../sdk/transforms/reflect/DoFnInvokers.java | 7 +- .../sdk/transforms/reflect/DoFnSignature.java | 23 ++- .../sdk/transforms/reflect/DoFnSignatures.java | 177 ++++++++++--------- .../DoFnSignaturesSplittableDoFnTest.java | 18 +- .../transforms/reflect/DoFnSignaturesTest.java | 7 +- .../reflect/DoFnSignaturesTestUtils.java | 8 +- 6 files changed, 124 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index dd134b7..c5a23dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.reflect; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.reflect.TypeToken; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -263,9 +262,9 @@ public class DoFnInvokers { /** Default implementation of {@link DoFn.GetRestrictionCoder}, for delegation by bytebuddy. */ public static class DefaultRestrictionCoder { - private final TypeToken<?> restrictionType; + private final TypeDescriptor<?> restrictionType; - DefaultRestrictionCoder(TypeToken<?> restrictionType) { + DefaultRestrictionCoder(TypeDescriptor<?> restrictionType) { this.restrictionType = restrictionType; } @@ -273,7 +272,7 @@ public class DoFnInvokers { @SuppressWarnings({"unused", "unchecked"}) public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry registry) throws CannotProvideCoderException { - return (Coder) registry.getCoder(TypeDescriptor.of(restrictionType.getType())); + return (Coder) registry.getCoder(restrictionType); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 71f7e53..6b98805 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.reflect; import com.google.auto.value.AutoValue; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; -import com.google.common.reflect.TypeToken; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Collections; @@ -342,7 +341,7 @@ public abstract class DoFnSignature { /** Concrete type of the {@link RestrictionTracker} parameter, if present. */ @Nullable - abstract TypeToken<?> trackerT(); + abstract TypeDescriptor<?> trackerT(); /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */ public abstract boolean hasReturnValue(); @@ -350,7 +349,7 @@ public abstract class DoFnSignature { static ProcessElementMethod create( Method targetMethod, List<Parameter> extraParameters, - TypeToken<?> trackerT, + TypeDescriptor<?> trackerT, boolean hasReturnValue) { return new AutoValue_DoFnSignature_ProcessElementMethod( targetMethod, Collections.unmodifiableList(extraParameters), trackerT, hasReturnValue); @@ -462,9 +461,9 @@ public abstract class DoFnSignature { public abstract Method targetMethod(); /** Type of the returned restriction. */ - abstract TypeToken<?> restrictionT(); + abstract TypeDescriptor<?> restrictionT(); - static GetInitialRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) { + static GetInitialRestrictionMethod create(Method targetMethod, TypeDescriptor<?> restrictionT) { return new AutoValue_DoFnSignature_GetInitialRestrictionMethod(targetMethod, restrictionT); } } @@ -477,9 +476,9 @@ public abstract class DoFnSignature { public abstract Method targetMethod(); /** Type of the restriction taken and returned. */ - abstract TypeToken<?> restrictionT(); + abstract TypeDescriptor<?> restrictionT(); - static SplitRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) { + static SplitRestrictionMethod create(Method targetMethod, TypeDescriptor<?> restrictionT) { return new AutoValue_DoFnSignature_SplitRestrictionMethod(targetMethod, restrictionT); } } @@ -492,13 +491,13 @@ public abstract class DoFnSignature { public abstract Method targetMethod(); /** Type of the input restriction. */ - abstract TypeToken<?> restrictionT(); + abstract TypeDescriptor<?> restrictionT(); /** Type of the returned {@link RestrictionTracker}. */ - abstract TypeToken<?> trackerT(); + abstract TypeDescriptor<?> trackerT(); static NewTrackerMethod create( - Method targetMethod, TypeToken<?> restrictionT, TypeToken<?> trackerT) { + Method targetMethod, TypeDescriptor<?> restrictionT, TypeDescriptor<?> trackerT) { return new AutoValue_DoFnSignature_NewTrackerMethod(targetMethod, restrictionT, trackerT); } } @@ -511,9 +510,9 @@ public abstract class DoFnSignature { public abstract Method targetMethod(); /** Type of the returned {@link Coder}. */ - abstract TypeToken<?> coderT(); + abstract TypeDescriptor<?> coderT(); - static GetRestrictionCoderMethod create(Method targetMethod, TypeToken<?> coderT) { + static GetRestrictionCoderMethod create(Method targetMethod, TypeDescriptor<?> coderT) { return new AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 5814c0e..c690ace 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -22,8 +22,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.google.common.reflect.TypeParameter; -import com.google.common.reflect.TypeToken; import java.lang.annotation.Annotation; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Field; @@ -57,6 +55,7 @@ import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; /** * Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getSignature}. @@ -90,18 +89,18 @@ public class DoFnSignatures { errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn"); builder.setFnClass(fnClass); - TypeToken<? extends DoFn<?, ?>> fnToken = TypeToken.of(fnClass); + TypeDescriptor<? extends DoFn<?, ?>> fnT = TypeDescriptor.of(fnClass); // Extract the input and output type, and whether the fn is bounded. - TypeToken<?> inputT = null; - TypeToken<?> outputT = null; - for (TypeToken<?> supertype : fnToken.getTypes()) { + TypeDescriptor<?> inputT = null; + TypeDescriptor<?> outputT = null; + for (TypeDescriptor<?> supertype : fnT.getTypes()) { if (!supertype.getRawType().equals(DoFn.class)) { continue; } Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments(); - inputT = TypeToken.of(args[0]); - outputT = TypeToken.of(args[1]); + inputT = TypeDescriptor.of(args[0]); + outputT = TypeDescriptor.of(args[1]); } errors.checkNotNull(inputT, "Unable to determine input type"); @@ -169,7 +168,7 @@ public class DoFnSignatures { DoFnSignature.ProcessElementMethod processElement = analyzeProcessElementMethod( processElementErrors, - fnToken, + fnT, processElementMethod, inputT, outputT, @@ -180,14 +179,14 @@ public class DoFnSignatures { if (startBundleMethod != null) { ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod); builder.setStartBundle( - analyzeBundleMethod(startBundleErrors, fnToken, startBundleMethod, inputT, outputT)); + analyzeBundleMethod(startBundleErrors, fnT, startBundleMethod, inputT, outputT)); } if (finishBundleMethod != null) { ErrorReporter finishBundleErrors = errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod); builder.setFinishBundle( - analyzeBundleMethod(finishBundleErrors, fnToken, finishBundleMethod, inputT, outputT)); + analyzeBundleMethod(finishBundleErrors, fnT, finishBundleMethod, inputT, outputT)); } if (setupMethod != null) { @@ -209,7 +208,7 @@ public class DoFnSignatures { builder.setGetInitialRestriction( getInitialRestriction = analyzeGetInitialRestrictionMethod( - getInitialRestrictionErrors, fnToken, getInitialRestrictionMethod, inputT)); + getInitialRestrictionErrors, fnT, getInitialRestrictionMethod, inputT)); } DoFnSignature.SplitRestrictionMethod splitRestriction = null; @@ -219,7 +218,7 @@ public class DoFnSignatures { builder.setSplitRestriction( splitRestriction = analyzeSplitRestrictionMethod( - splitRestrictionErrors, fnToken, splitRestrictionMethod, inputT)); + splitRestrictionErrors, fnT, splitRestrictionMethod, inputT)); } DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = null; @@ -229,17 +228,17 @@ public class DoFnSignatures { builder.setGetRestrictionCoder( getRestrictionCoder = analyzeGetRestrictionCoderMethod( - getRestrictionCoderErrors, fnToken, getRestrictionCoderMethod)); + getRestrictionCoderErrors, fnT, getRestrictionCoderMethod)); } DoFnSignature.NewTrackerMethod newTracker = null; if (newTrackerMethod != null) { ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, newTrackerMethod); builder.setNewTracker( - newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnToken, newTrackerMethod)); + newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnT, newTrackerMethod)); } - builder.setIsBoundedPerElement(inferBoundedness(fnToken, processElement, errors)); + builder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors)); DoFnSignature signature = builder.build(); @@ -271,11 +270,11 @@ public class DoFnSignatures { * </ol> */ private static PCollection.IsBounded inferBoundedness( - TypeToken<? extends DoFn> fnToken, + TypeDescriptor<? extends DoFn> fnT, DoFnSignature.ProcessElementMethod processElement, ErrorReporter errors) { PCollection.IsBounded isBounded = null; - for (TypeToken<?> supertype : fnToken.getTypes()) { + for (TypeDescriptor<?> supertype : fnT.getTypes()) { if (supertype.getRawType().isAnnotationPresent(DoFn.BoundedPerElement.class) || supertype.getRawType().isAnnotationPresent(DoFn.UnboundedPerElement.class)) { errors.checkArgument( @@ -354,7 +353,7 @@ public class DoFnSignatures { ErrorReporter getInitialRestrictionErrors = errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod()); - TypeToken<?> restrictionT = getInitialRestriction.restrictionT(); + TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT(); getInitialRestrictionErrors.checkArgument( restrictionT.equals(newTracker.restrictionT()), @@ -411,49 +410,54 @@ public class DoFnSignatures { } /** - * Generates a type token for {@code DoFn<InputT, OutputT>.ProcessContext} given {@code InputT} - * and {@code OutputT}. + * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.ProcessContext} given + * {@code InputT} and {@code OutputT}. */ private static <InputT, OutputT> - TypeToken<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf( - TypeToken<InputT> inputT, TypeToken<OutputT> outputT) { - return new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}.where( + TypeDescriptor<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf( + TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) { + return new TypeDescriptor<DoFn<InputT, OutputT>.ProcessContext>() {}.where( new TypeParameter<InputT>() {}, inputT) .where(new TypeParameter<OutputT>() {}, outputT); } /** - * Generates a type token for {@code DoFn<InputT, OutputT>.Context} given {@code InputT} and - * {@code OutputT}. + * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.Context} given {@code + * InputT} and {@code OutputT}. */ - private static <InputT, OutputT> TypeToken<DoFn<InputT, OutputT>.Context> doFnContextTypeOf( - TypeToken<InputT> inputT, TypeToken<OutputT> outputT) { - return new TypeToken<DoFn<InputT, OutputT>.Context>() {}.where( + private static <InputT, OutputT> TypeDescriptor<DoFn<InputT, OutputT>.Context> doFnContextTypeOf( + TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) { + return new TypeDescriptor<DoFn<InputT, OutputT>.Context>() {}.where( new TypeParameter<InputT>() {}, inputT) .where(new TypeParameter<OutputT>() {}, outputT); } - /** Generates a type token for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */ - private static <InputT> TypeToken<DoFn.InputProvider<InputT>> inputProviderTypeOf( - TypeToken<InputT> inputT) { - return new TypeToken<DoFn.InputProvider<InputT>>() {}.where( + /** + * Generates a {@link TypeDescriptor} for {@code DoFn.InputProvider<InputT>} given {@code InputT}. + */ + private static <InputT> TypeDescriptor<DoFn.InputProvider<InputT>> inputProviderTypeOf( + TypeDescriptor<InputT> inputT) { + return new TypeDescriptor<DoFn.InputProvider<InputT>>() {}.where( new TypeParameter<InputT>() {}, inputT); } - /** Generates a type token for {@code DoFn.OutputReceiver<OutputT>} given {@code OutputT}. */ - private static <OutputT> TypeToken<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf( - TypeToken<OutputT> inputT) { - return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where( + /** + * Generates a {@link TypeDescriptor} for {@code DoFn.OutputReceiver<OutputT>} given {@code + * OutputT}. + */ + private static <OutputT> TypeDescriptor<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf( + TypeDescriptor<OutputT> inputT) { + return new TypeDescriptor<DoFn.OutputReceiver<OutputT>>() {}.where( new TypeParameter<OutputT>() {}, inputT); } @VisibleForTesting static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod( ErrorReporter errors, - TypeToken<? extends DoFn<?, ?>> fnClass, + TypeDescriptor<? extends DoFn<?, ?>> fnClass, Method m, - TypeToken<?> inputT, - TypeToken<?> outputT, + TypeDescriptor<?> inputT, + TypeDescriptor<?> outputT, Map<String, StateDeclaration> stateDeclarations, Map<String, TimerDeclaration> timerDeclarations) { errors.checkArgument( @@ -462,27 +466,27 @@ public class DoFnSignatures { "Must return void or %s", DoFn.ProcessContinuation.class.getSimpleName()); - TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT); + TypeDescriptor<?> processContextT = doFnProcessContextTypeOf(inputT, outputT); Type[] params = m.getGenericParameterTypes(); - TypeToken<?> contextToken = null; + TypeDescriptor<?> contextT = null; if (params.length > 0) { - contextToken = fnClass.resolveType(params[0]); + contextT = fnClass.resolveType(params[0]); } errors.checkArgument( - contextToken != null && contextToken.equals(processContextToken), + contextT != null && contextT.equals(processContextT), "Must take %s as the first argument", - formatType(processContextToken)); + formatType(processContextT)); List<DoFnSignature.Parameter> extraParameters = new ArrayList<>(); Map<String, DoFnSignature.Parameter> stateParameters = new HashMap<>(); Map<String, DoFnSignature.Parameter> timerParameters = new HashMap<>(); - TypeToken<?> trackerT = null; + TypeDescriptor<?> trackerT = null; - TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT); - TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT); + TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT); + TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT); for (int i = 1; i < params.length; ++i) { - TypeToken<?> paramT = fnClass.resolveType(params[i]); + TypeDescriptor<?> paramT = fnClass.resolveType(params[i]); Class<?> rawType = paramT.getRawType(); if (rawType.equals(BoundedWindow.class)) { errors.checkArgument( @@ -641,8 +645,8 @@ public class DoFnSignatures { } else { List<String> allowedParamTypes = Arrays.asList( - formatType(new TypeToken<BoundedWindow>() {}), - formatType(new TypeToken<RestrictionTracker<?>>() {})); + formatType(new TypeDescriptor<BoundedWindow>() {}), + formatType(new TypeDescriptor<RestrictionTracker<?>>() {})); errors.throwIllegalArgument( "%s is not a valid context parameter. Should be one of %s", formatType(paramT), allowedParamTypes); @@ -665,17 +669,17 @@ public class DoFnSignatures { @VisibleForTesting static DoFnSignature.BundleMethod analyzeBundleMethod( ErrorReporter errors, - TypeToken<? extends DoFn<?, ?>> fnToken, + TypeDescriptor<? extends DoFn<?, ?>> fnT, Method m, - TypeToken<?> inputT, - TypeToken<?> outputT) { + TypeDescriptor<?> inputT, + TypeDescriptor<?> outputT) { errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void"); - TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT); + TypeDescriptor<?> expectedContextT = doFnContextTypeOf(inputT, outputT); Type[] params = m.getGenericParameterTypes(); errors.checkArgument( - params.length == 1 && fnToken.resolveType(params[0]).equals(expectedContextToken), + params.length == 1 && fnT.resolveType(params[0]).equals(expectedContextT), "Must take a single argument of type %s", - formatType(expectedContextToken)); + formatType(expectedContextT)); return DoFnSignature.BundleMethod.create(m); } @@ -688,27 +692,33 @@ public class DoFnSignatures { @VisibleForTesting static DoFnSignature.GetInitialRestrictionMethod analyzeGetInitialRestrictionMethod( - ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) { + ErrorReporter errors, + TypeDescriptor<? extends DoFn> fnT, + Method m, + TypeDescriptor<?> inputT) { // Method is of the form: // @GetInitialRestriction // RestrictionT getInitialRestriction(InputT element); Type[] params = m.getGenericParameterTypes(); errors.checkArgument( - params.length == 1 && fnToken.resolveType(params[0]).equals(inputT), + params.length == 1 && fnT.resolveType(params[0]).equals(inputT), "Must take a single argument of type %s", formatType(inputT)); return DoFnSignature.GetInitialRestrictionMethod.create( - m, fnToken.resolveType(m.getGenericReturnType())); + m, fnT.resolveType(m.getGenericReturnType())); } - /** Generates a type token for {@code List<T>} given {@code T}. */ - private static <T> TypeToken<List<T>> listTypeOf(TypeToken<T> elementT) { - return new TypeToken<List<T>>() {}.where(new TypeParameter<T>() {}, elementT); + /** Generates a {@link TypeDescriptor} for {@code List<T>} given {@code T}. */ + private static <T> TypeDescriptor<List<T>> listTypeOf(TypeDescriptor<T> elementT) { + return new TypeDescriptor<List<T>>() {}.where(new TypeParameter<T>() {}, elementT); } @VisibleForTesting static DoFnSignature.SplitRestrictionMethod analyzeSplitRestrictionMethod( - ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) { + ErrorReporter errors, + TypeDescriptor<? extends DoFn> fnT, + Method m, + TypeDescriptor<?> inputT) { // Method is of the form: // @SplitRestriction // void splitRestriction(InputT element, RestrictionT restriction); @@ -717,13 +727,13 @@ public class DoFnSignatures { Type[] params = m.getGenericParameterTypes(); errors.checkArgument(params.length == 3, "Must have exactly 3 arguments"); errors.checkArgument( - fnToken.resolveType(params[0]).equals(inputT), + fnT.resolveType(params[0]).equals(inputT), "First argument must be the element type %s", formatType(inputT)); - TypeToken<?> restrictionT = fnToken.resolveType(params[1]); - TypeToken<?> receiverT = fnToken.resolveType(params[2]); - TypeToken<?> expectedReceiverT = outputReceiverTypeOf(restrictionT); + TypeDescriptor<?> restrictionT = fnT.resolveType(params[1]); + TypeDescriptor<?> receiverT = fnT.resolveType(params[2]); + TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(restrictionT); errors.checkArgument( receiverT.equals(expectedReceiverT), "Third argument must be %s, but is %s", @@ -777,45 +787,46 @@ public class DoFnSignatures { } } - /** Generates a type token for {@code Coder<T>} given {@code T}. */ - private static <T> TypeToken<Coder<T>> coderTypeOf(TypeToken<T> elementT) { - return new TypeToken<Coder<T>>() {}.where(new TypeParameter<T>() {}, elementT); + /** Generates a {@link TypeDescriptor} for {@code Coder<T>} given {@code T}. */ + private static <T> TypeDescriptor<Coder<T>> coderTypeOf(TypeDescriptor<T> elementT) { + return new TypeDescriptor<Coder<T>>() {}.where(new TypeParameter<T>() {}, elementT); } @VisibleForTesting static DoFnSignature.GetRestrictionCoderMethod analyzeGetRestrictionCoderMethod( - ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) { + ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) { errors.checkArgument(m.getParameterTypes().length == 0, "Must have zero arguments"); - TypeToken<?> resT = fnToken.resolveType(m.getGenericReturnType()); + TypeDescriptor<?> resT = fnT.resolveType(m.getGenericReturnType()); errors.checkArgument( - resT.isSubtypeOf(TypeToken.of(Coder.class)), + resT.isSubtypeOf(TypeDescriptor.of(Coder.class)), "Must return a Coder, but returns %s", formatType(resT)); return DoFnSignature.GetRestrictionCoderMethod.create(m, resT); } /** - * Generates a type token for {@code RestrictionTracker<RestrictionT>} given {@code RestrictionT}. + * Generates a {@link TypeDescriptor} for {@code RestrictionTracker<RestrictionT>} given {@code + * RestrictionT}. */ private static <RestrictionT> - TypeToken<RestrictionTracker<RestrictionT>> restrictionTrackerTypeOf( - TypeToken<RestrictionT> restrictionT) { - return new TypeToken<RestrictionTracker<RestrictionT>>() {}.where( + TypeDescriptor<RestrictionTracker<RestrictionT>> restrictionTrackerTypeOf( + TypeDescriptor<RestrictionT> restrictionT) { + return new TypeDescriptor<RestrictionTracker<RestrictionT>>() {}.where( new TypeParameter<RestrictionT>() {}, restrictionT); } @VisibleForTesting static DoFnSignature.NewTrackerMethod analyzeNewTrackerMethod( - ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) { + ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) { // Method is of the form: // @NewTracker // TrackerT newTracker(RestrictionT restriction); Type[] params = m.getGenericParameterTypes(); errors.checkArgument(params.length == 1, "Must have a single argument"); - TypeToken<?> restrictionT = fnToken.resolveType(params[0]); - TypeToken<?> trackerT = fnToken.resolveType(m.getGenericReturnType()); - TypeToken<?> expectedTrackerT = restrictionTrackerTypeOf(restrictionT); + TypeDescriptor<?> restrictionT = fnT.resolveType(params[0]); + TypeDescriptor<?> trackerT = fnT.resolveType(m.getGenericReturnType()); + TypeDescriptor<?> expectedTrackerT = restrictionTrackerTypeOf(restrictionT); errors.checkArgument( trackerT.isSubtypeOf(expectedTrackerT), "Returns %s, but must return a subtype of %s", @@ -985,7 +996,7 @@ public class DoFnSignatures { return ReflectHelpers.METHOD_FORMATTER.apply(method); } - private static String formatType(TypeToken<?> t) { + private static String formatType(TypeDescriptor<?> t) { return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index 68278c5..573701b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -22,7 +22,6 @@ import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.err import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import com.google.common.reflect.TypeToken; import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -34,6 +33,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -403,12 +403,12 @@ public class DoFnSignaturesSplittableDoFnTest { "Third argument must be OutputReceiver<SomeRestriction>, but is OutputReceiver<String>"); DoFnSignatures.analyzeSplitRestrictionMethod( errors(), - TypeToken.of(FakeDoFn.class), + TypeDescriptor.of(FakeDoFn.class), new AnonymousMethod() { void method( Integer element, SomeRestriction restriction, DoFn.OutputReceiver<String> receiver) {} }.getMethod(), - TypeToken.of(Integer.class)); + TypeDescriptor.of(Integer.class)); } @Test @@ -422,14 +422,14 @@ public class DoFnSignaturesSplittableDoFnTest { thrown.expectMessage("First argument must be the element type Integer"); DoFnSignatures.analyzeSplitRestrictionMethod( errors(), - TypeToken.of(FakeDoFn.class), + TypeDescriptor.of(FakeDoFn.class), new AnonymousMethod() { void method( String element, SomeRestriction restriction, DoFn.OutputReceiver<SomeRestriction> receiver) {} }.getMethod(), - TypeToken.of(Integer.class)); + TypeDescriptor.of(Integer.class)); } @Test @@ -437,7 +437,7 @@ public class DoFnSignaturesSplittableDoFnTest { thrown.expectMessage("Must have exactly 3 arguments"); DoFnSignatures.analyzeSplitRestrictionMethod( errors(), - TypeToken.of(FakeDoFn.class), + TypeDescriptor.of(FakeDoFn.class), new AnonymousMethod() { private void method( Integer element, @@ -445,7 +445,7 @@ public class DoFnSignaturesSplittableDoFnTest { DoFn.OutputReceiver<SomeRestriction> receiver, Object extra) {} }.getMethod(), - TypeToken.of(Integer.class)); + TypeDescriptor.of(Integer.class)); } @Test @@ -519,7 +519,7 @@ public class DoFnSignaturesSplittableDoFnTest { thrown.expectMessage("Must have a single argument"); DoFnSignatures.analyzeNewTrackerMethod( errors(), - TypeToken.of(FakeDoFn.class), + TypeDescriptor.of(FakeDoFn.class), new AnonymousMethod() { private SomeRestrictionTracker method(SomeRestriction restriction, Object extra) { return null; @@ -533,7 +533,7 @@ public class DoFnSignaturesSplittableDoFnTest { "Returns SomeRestrictionTracker, but must return a subtype of RestrictionTracker<String>"); DoFnSignatures.analyzeNewTrackerMethod( errors(), - TypeToken.of(FakeDoFn.class), + TypeDescriptor.of(FakeDoFn.class), new AnonymousMethod() { private SomeRestrictionTracker method(String restriction) { return null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index fe88c3b..52ecb2a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import com.google.common.reflect.TypeToken; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -66,12 +65,12 @@ public class DoFnSignaturesTest { DoFnSignatures.analyzeBundleMethod( errors(), - TypeToken.of(FakeDoFn.class), + TypeDescriptor.of(FakeDoFn.class), new DoFnSignaturesTestUtils.AnonymousMethod() { void method(DoFn<Integer, String>.Context c, int n) {} }.getMethod(), - TypeToken.of(Integer.class), - TypeToken.of(String.class)); + TypeDescriptor.of(Integer.class), + TypeDescriptor.of(String.class)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java index ce00f2d..49e2ba7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java @@ -17,11 +17,11 @@ */ package org.apache.beam.sdk.transforms.reflect; -import com.google.common.reflect.TypeToken; import java.lang.reflect.Method; import java.util.Collections; import java.util.NoSuchElementException; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.TypeDescriptor; /** Utilities for use in {@link DoFnSignatures} tests. */ class DoFnSignaturesTestUtils { @@ -57,10 +57,10 @@ class DoFnSignaturesTestUtils { throws Exception { return DoFnSignatures.analyzeProcessElementMethod( errors(), - TypeToken.of(FakeDoFn.class), + TypeDescriptor.of(FakeDoFn.class), method.getMethod(), - TypeToken.of(Integer.class), - TypeToken.of(String.class), + TypeDescriptor.of(Integer.class), + TypeDescriptor.of(String.class), Collections.EMPTY_MAP, Collections.EMPTY_MAP); }