Repository: beam Updated Branches: refs/heads/master 6d8590028 -> 69846f500
Convert Coder into an Abstract Static Class Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72be5c71 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72be5c71 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72be5c71 Branch: refs/heads/master Commit: 72be5c71561bf552c25e2de2b0d21aa374b17ec0 Parents: 6d85900 Author: Thomas Groh <tg...@google.com> Authored: Fri May 5 09:20:10 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri May 5 10:34:56 2017 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 2 +- .../java/org/apache/beam/sdk/coders/Coder.java | 36 ++++++++++---------- .../apache/beam/sdk/coders/CoderFactories.java | 35 ++++++++++--------- .../apache/beam/sdk/coders/StructuredCoder.java | 2 +- .../DoFnSignaturesSplittableDoFnTest.java | 4 +-- 5 files changed, 40 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index e21b6de..c7142fe 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ <packaging>jar</packaging> <properties> - <dataflow.container_version>beam-master-20170504-2</dataflow.container_version> + <dataflow.container_version>beam-master-20170505-wd-2914</dataflow.container_version> <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version> <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version> </properties> http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index c923719..169e448 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -57,10 +57,10 @@ import org.apache.beam.sdk.values.TypeDescriptor; * * @param <T> the type of the values being transcoded */ -public interface Coder<T> extends Serializable { +public abstract class Coder<T> implements Serializable { /** The context in which encoding or decoding is being done. */ @Deprecated - class Context { + public static class Context { /** * The outer context: the value being encoded or decoded takes * up the remainder of the record/stream contents. @@ -118,7 +118,7 @@ public interface Coder<T> extends Serializable { * for some reason * @throws CoderException if the value could not be encoded for some reason */ - void encode(T value, OutputStream outStream) + public abstract void encode(T value, OutputStream outStream) throws CoderException, IOException; /** @@ -130,7 +130,7 @@ public interface Coder<T> extends Serializable { * @throws CoderException if the value could not be encoded for some reason */ @Deprecated - void encodeOuter(T value, OutputStream outStream) + public abstract void encodeOuter(T value, OutputStream outStream) throws CoderException, IOException; /** @@ -142,7 +142,7 @@ public interface Coder<T> extends Serializable { * @throws CoderException if the value could not be encoded for some reason */ @Deprecated - void encode(T value, OutputStream outStream, Context context) + public abstract void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException; /** @@ -153,7 +153,7 @@ public interface Coder<T> extends Serializable { * for some reason * @throws CoderException if the value could not be decoded for some reason */ - T decode(InputStream inStream) throws CoderException, IOException; + public abstract T decode(InputStream inStream) throws CoderException, IOException; /** * Decodes a value of type {@code T} from the given input stream in @@ -164,7 +164,7 @@ public interface Coder<T> extends Serializable { * @throws CoderException if the value could not be decoded for some reason */ @Deprecated - T decodeOuter(InputStream inStream) throws CoderException, IOException; + public abstract T decodeOuter(InputStream inStream) throws CoderException, IOException; /** * Decodes a value of type {@code T} from the given input stream in @@ -175,7 +175,7 @@ public interface Coder<T> extends Serializable { * @throws CoderException if the value could not be decoded for some reason */ @Deprecated - T decode(InputStream inStream, Context context) + public abstract T decode(InputStream inStream, Context context) throws CoderException, IOException; /** @@ -184,7 +184,7 @@ public interface Coder<T> extends Serializable { * returns {@code null} if this cannot be done or this is not a * parameterized type. */ - List<? extends Coder<?>> getCoderArguments(); + public abstract List<? extends Coder<?>> getCoderArguments(); /** * Throw {@link NonDeterministicException} if the coding is not deterministic. @@ -202,7 +202,7 @@ public interface Coder<T> extends Serializable { * * @throws Coder.NonDeterministicException if this coder is not deterministic. */ - void verifyDeterministic() throws Coder.NonDeterministicException; + public abstract void verifyDeterministic() throws Coder.NonDeterministicException; /** * Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}. @@ -214,7 +214,7 @@ public interface Coder<T> extends Serializable { * whenever {@code equals()} compares object identity, rather than performing a * semantic/structural comparison. */ - boolean consistentWithEquals(); + public abstract boolean consistentWithEquals(); /** * Returns an object with an {@code Object.equals()} method that represents structural equality @@ -234,7 +234,7 @@ public interface Coder<T> extends Serializable { * * <p>See also {@link #consistentWithEquals()}. */ - Object structuralValue(T value); + public abstract Object structuralValue(T value); /** * Returns whether {@link #registerByteSizeObserver} cheap enough to @@ -246,7 +246,7 @@ public interface Coder<T> extends Serializable { * {@link org.apache.beam.sdk.runners.PipelineRunner} * implementations. */ - boolean isRegisterByteSizeObserverCheap(T value); + public abstract boolean isRegisterByteSizeObserverCheap(T value); /** * Returns whether {@link #registerByteSizeObserver} cheap enough to @@ -259,7 +259,7 @@ public interface Coder<T> extends Serializable { * implementations. */ @Deprecated - boolean isRegisterByteSizeObserverCheap(T value, Context context); + public abstract boolean isRegisterByteSizeObserverCheap(T value, Context context); /** * Notifies the {@code ElementByteSizeObserver} about the byte size @@ -269,7 +269,7 @@ public interface Coder<T> extends Serializable { * {@link org.apache.beam.sdk.runners.PipelineRunner} * implementations. */ - void registerByteSizeObserver( + public abstract void registerByteSizeObserver( T value, ElementByteSizeObserver observer) throws Exception; @@ -282,7 +282,7 @@ public interface Coder<T> extends Serializable { * implementations. */ @Deprecated - void registerByteSizeObserver( + public abstract void registerByteSizeObserver( T value, ElementByteSizeObserver observer, Context context) throws Exception; @@ -290,13 +290,13 @@ public interface Coder<T> extends Serializable { * Returns the {@link TypeDescriptor} for the type encoded. */ @Experimental(Kind.CODER_TYPE_ENCODING) - TypeDescriptor<T> getEncodedTypeDescriptor(); + public abstract TypeDescriptor<T> getEncodedTypeDescriptor(); /** * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is * not deterministic, including details of why the encoding is not deterministic. */ - class NonDeterministicException extends Exception { + public static class NonDeterministicException extends Exception { private Coder<?> coder; private List<String> reasons; http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java index 0031698..2a1d792 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.coders; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.base.MoreObjects; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -70,7 +72,12 @@ public final class CoderFactories { * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}. */ public static <T> CoderFactory fromStaticMethods(Class<T> clazz) { - return new CoderFactoryFromStaticMethods(clazz); + checkArgument( + Coder.class.isAssignableFrom(clazz), + "%s is not a subtype of %s", + clazz.getName(), + Coder.class.getSimpleName()); + return new CoderFactoryFromStaticMethods((Class<? extends Coder>) clazz); } /** @@ -142,7 +149,7 @@ public final class CoderFactories { * Returns a CoderFactory that invokes the given static factory method * to create the Coder. */ - private CoderFactoryFromStaticMethods(Class<?> coderClazz) { + private CoderFactoryFromStaticMethods(Class<? extends Coder> coderClazz) { this.factoryMethod = getFactoryMethod(coderClazz); this.getComponentsMethod = getInstanceComponentsMethod(coderClazz); } @@ -203,8 +210,8 @@ public final class CoderFactories { * each corresponding to an argument of the {@code of} * method. */ - private <T> Method getInstanceComponentsMethod(Class<?> coderClazz) { - TypeDescriptor<?> coderType = TypeDescriptor.of(coderClazz); + private <T, CoderT extends Coder> Method getInstanceComponentsMethod(Class<CoderT> coderClazz) { + TypeDescriptor<CoderT> coderType = TypeDescriptor.of(coderClazz); TypeDescriptor<T> argumentType = getCodedType(coderType); // getInstanceComponents may be implemented in a superclass, @@ -235,19 +242,13 @@ public final class CoderFactories { * If {@code coderType} is a subclass of {@link Coder} for a specific * type {@code T}, returns {@code T.class}. Otherwise, raises IllegalArgumentException. */ - private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<?> coderType) { - for (TypeDescriptor<?> ifaceType : coderType.getInterfaces()) { - if (ifaceType.getRawType().equals(Coder.class)) { - ParameterizedType coderIface = (ParameterizedType) ifaceType.getType(); - @SuppressWarnings("unchecked") - TypeDescriptor<T> token = - (TypeDescriptor<T>) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]); - return token; - } - } - throw new IllegalArgumentException( - "cannot build CoderFactory from class " + coderType - + ": does not implement Coder<T> for any T."); + private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<? extends Coder> coderType) { + TypeDescriptor<?> coderSupertype = coderType.getSupertype(Coder.class); + ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType(); + @SuppressWarnings("unchecked") + TypeDescriptor<T> token = + (TypeDescriptor<T>) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]); + return token; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index cc39429..0c72618 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -43,7 +43,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; * expensive.</li> * </ul> */ -public abstract class StructuredCoder<T> implements Coder<T> { +public abstract class StructuredCoder<T> extends Coder<T> { protected StructuredCoder() {} /** http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index b937e84..07b3348 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -25,8 +25,8 @@ import static org.junit.Assert.assertTrue; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import java.util.List; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; @@ -57,7 +57,7 @@ public class DoFnSignaturesSplittableDoFnTest { private abstract static class SomeRestrictionTracker implements RestrictionTracker<SomeRestriction> {} - private abstract static class SomeRestrictionCoder implements Coder<SomeRestriction> {} + private abstract static class SomeRestrictionCoder extends StructuredCoder<SomeRestriction> {} @Test public void testHasRestrictionTracker() throws Exception {