Repository: beam
Updated Branches:
  refs/heads/master 6d8590028 -> 69846f500


Convert Coder into an Abstract Static Class


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/72be5c71
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/72be5c71
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/72be5c71

Branch: refs/heads/master
Commit: 72be5c71561bf552c25e2de2b0d21aa374b17ec0
Parents: 6d85900
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 09:20:10 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 10:34:56 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  2 +-
 .../java/org/apache/beam/sdk/coders/Coder.java  | 36 ++++++++++----------
 .../apache/beam/sdk/coders/CoderFactories.java  | 35 ++++++++++---------
 .../apache/beam/sdk/coders/StructuredCoder.java |  2 +-
 .../DoFnSignaturesSplittableDoFnTest.java       |  4 +--
 5 files changed, 40 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index e21b6de..c7142fe 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    
<dataflow.container_version>beam-master-20170504-2</dataflow.container_version>
+    
<dataflow.container_version>beam-master-20170505-wd-2914</dataflow.container_version>
     
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>

http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index c923719..169e448 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -57,10 +57,10 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> the type of the values being transcoded
  */
-public interface Coder<T> extends Serializable {
+public abstract class Coder<T> implements Serializable {
   /** The context in which encoding or decoding is being done. */
   @Deprecated
-  class Context {
+  public static class Context {
     /**
      * The outer context: the value being encoded or decoded takes
      * up the remainder of the record/stream contents.
@@ -118,7 +118,7 @@ public interface Coder<T> extends Serializable {
    * for some reason
    * @throws CoderException if the value could not be encoded for some reason
    */
-  void encode(T value, OutputStream outStream)
+  public abstract void encode(T value, OutputStream outStream)
       throws CoderException, IOException;
 
   /**
@@ -130,7 +130,7 @@ public interface Coder<T> extends Serializable {
    * @throws CoderException if the value could not be encoded for some reason
    */
   @Deprecated
-  void encodeOuter(T value, OutputStream outStream)
+  public abstract void encodeOuter(T value, OutputStream outStream)
       throws CoderException, IOException;
 
   /**
@@ -142,7 +142,7 @@ public interface Coder<T> extends Serializable {
    * @throws CoderException if the value could not be encoded for some reason
    */
   @Deprecated
-  void encode(T value, OutputStream outStream, Context context)
+  public abstract void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException;
 
   /**
@@ -153,7 +153,7 @@ public interface Coder<T> extends Serializable {
    * for some reason
    * @throws CoderException if the value could not be decoded for some reason
    */
-  T decode(InputStream inStream) throws CoderException, IOException;
+  public abstract T decode(InputStream inStream) throws CoderException, 
IOException;
 
   /**
    * Decodes a value of type {@code T} from the given input stream in
@@ -164,7 +164,7 @@ public interface Coder<T> extends Serializable {
    * @throws CoderException if the value could not be decoded for some reason
    */
   @Deprecated
-  T decodeOuter(InputStream inStream) throws CoderException, IOException;
+  public abstract T decodeOuter(InputStream inStream) throws CoderException, 
IOException;
 
   /**
    * Decodes a value of type {@code T} from the given input stream in
@@ -175,7 +175,7 @@ public interface Coder<T> extends Serializable {
    * @throws CoderException if the value could not be decoded for some reason
    */
   @Deprecated
-  T decode(InputStream inStream, Context context)
+  public abstract T decode(InputStream inStream, Context context)
       throws CoderException, IOException;
 
   /**
@@ -184,7 +184,7 @@ public interface Coder<T> extends Serializable {
    * returns {@code null} if this cannot be done or this is not a
    * parameterized type.
    */
-  List<? extends Coder<?>> getCoderArguments();
+  public abstract List<? extends Coder<?>> getCoderArguments();
 
   /**
    * Throw {@link NonDeterministicException} if the coding is not 
deterministic.
@@ -202,7 +202,7 @@ public interface Coder<T> extends Serializable {
    *
    * @throws Coder.NonDeterministicException if this coder is not 
deterministic.
    */
-  void verifyDeterministic() throws Coder.NonDeterministicException;
+  public abstract void verifyDeterministic() throws 
Coder.NonDeterministicException;
 
   /**
    * Returns {@code true} if this {@link Coder} is injective with respect to 
{@link Objects#equals}.
@@ -214,7 +214,7 @@ public interface Coder<T> extends Serializable {
    * whenever {@code equals()} compares object identity, rather than 
performing a
    * semantic/structural comparison.
    */
-  boolean consistentWithEquals();
+  public abstract boolean consistentWithEquals();
 
   /**
    * Returns an object with an {@code Object.equals()} method that represents 
structural equality
@@ -234,7 +234,7 @@ public interface Coder<T> extends Serializable {
    *
    * <p>See also {@link #consistentWithEquals()}.
    */
-  Object structuralValue(T value);
+  public abstract Object structuralValue(T value);
 
   /**
    * Returns whether {@link #registerByteSizeObserver} cheap enough to
@@ -246,7 +246,7 @@ public interface Coder<T> extends Serializable {
    * {@link org.apache.beam.sdk.runners.PipelineRunner}
    * implementations.
    */
-  boolean isRegisterByteSizeObserverCheap(T value);
+  public abstract boolean isRegisterByteSizeObserverCheap(T value);
 
   /**
    * Returns whether {@link #registerByteSizeObserver} cheap enough to
@@ -259,7 +259,7 @@ public interface Coder<T> extends Serializable {
    * implementations.
    */
   @Deprecated
-  boolean isRegisterByteSizeObserverCheap(T value, Context context);
+  public abstract boolean isRegisterByteSizeObserverCheap(T value, Context 
context);
 
   /**
    * Notifies the {@code ElementByteSizeObserver} about the byte size
@@ -269,7 +269,7 @@ public interface Coder<T> extends Serializable {
    * {@link org.apache.beam.sdk.runners.PipelineRunner}
    * implementations.
    */
-  void registerByteSizeObserver(
+  public abstract void registerByteSizeObserver(
       T value, ElementByteSizeObserver observer)
       throws Exception;
 
@@ -282,7 +282,7 @@ public interface Coder<T> extends Serializable {
    * implementations.
    */
   @Deprecated
-  void registerByteSizeObserver(
+  public abstract void registerByteSizeObserver(
       T value, ElementByteSizeObserver observer, Context context)
       throws Exception;
 
@@ -290,13 +290,13 @@ public interface Coder<T> extends Serializable {
    * Returns the {@link TypeDescriptor} for the type encoded.
    */
   @Experimental(Kind.CODER_TYPE_ENCODING)
-  TypeDescriptor<T> getEncodedTypeDescriptor();
+  public abstract TypeDescriptor<T> getEncodedTypeDescriptor();
 
   /**
    * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is
    * not deterministic, including details of why the encoding is not 
deterministic.
    */
-  class NonDeterministicException extends Exception {
+  public static class NonDeterministicException extends Exception {
     private Coder<?> coder;
     private List<String> reasons;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
index 0031698..2a1d792 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.base.MoreObjects;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -70,7 +72,12 @@ public final class CoderFactories {
    * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}.
    */
   public static <T> CoderFactory fromStaticMethods(Class<T> clazz) {
-    return new CoderFactoryFromStaticMethods(clazz);
+    checkArgument(
+        Coder.class.isAssignableFrom(clazz),
+        "%s is not a subtype of %s",
+        clazz.getName(),
+        Coder.class.getSimpleName());
+    return new CoderFactoryFromStaticMethods((Class<? extends Coder>) clazz);
   }
 
   /**
@@ -142,7 +149,7 @@ public final class CoderFactories {
      * Returns a CoderFactory that invokes the given static factory method
      * to create the Coder.
      */
-    private CoderFactoryFromStaticMethods(Class<?> coderClazz) {
+    private CoderFactoryFromStaticMethods(Class<? extends Coder> coderClazz) {
       this.factoryMethod = getFactoryMethod(coderClazz);
       this.getComponentsMethod = getInstanceComponentsMethod(coderClazz);
     }
@@ -203,8 +210,8 @@ public final class CoderFactories {
      * each corresponding to an argument of the {@code of}
      * method.
      */
-    private <T> Method getInstanceComponentsMethod(Class<?> coderClazz) {
-      TypeDescriptor<?> coderType = TypeDescriptor.of(coderClazz);
+    private <T, CoderT extends Coder> Method 
getInstanceComponentsMethod(Class<CoderT> coderClazz) {
+      TypeDescriptor<CoderT> coderType = TypeDescriptor.of(coderClazz);
       TypeDescriptor<T> argumentType = getCodedType(coderType);
 
       // getInstanceComponents may be implemented in a superclass,
@@ -235,19 +242,13 @@ public final class CoderFactories {
      * If {@code coderType} is a subclass of {@link Coder} for a specific
      * type {@code T}, returns {@code T.class}. Otherwise, raises 
IllegalArgumentException.
      */
-    private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<?> coderType) {
-      for (TypeDescriptor<?> ifaceType : coderType.getInterfaces()) {
-        if (ifaceType.getRawType().equals(Coder.class)) {
-          ParameterizedType coderIface = (ParameterizedType) 
ifaceType.getType();
-          @SuppressWarnings("unchecked")
-          TypeDescriptor<T> token =
-              (TypeDescriptor<T>) 
TypeDescriptor.of(coderIface.getActualTypeArguments()[0]);
-          return token;
-        }
-      }
-      throw new IllegalArgumentException(
-          "cannot build CoderFactory from class " + coderType
-          + ": does not implement Coder<T> for any T.");
+    private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<? extends Coder> 
coderType) {
+      TypeDescriptor<?> coderSupertype = coderType.getSupertype(Coder.class);
+      ParameterizedType coderIface = (ParameterizedType) 
coderSupertype.getType();
+      @SuppressWarnings("unchecked")
+      TypeDescriptor<T> token =
+          (TypeDescriptor<T>) 
TypeDescriptor.of(coderIface.getActualTypeArguments()[0]);
+      return token;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
index cc39429..0c72618 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *       expensive.</li>
  * </ul>
  */
-public abstract class StructuredCoder<T> implements Coder<T> {
+public abstract class StructuredCoder<T> extends Coder<T> {
   protected StructuredCoder() {}
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/72be5c71/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index b937e84..07b3348 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -25,8 +25,8 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
@@ -57,7 +57,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   private abstract static class SomeRestrictionTracker
       implements RestrictionTracker<SomeRestriction> {}
 
-  private abstract static class SomeRestrictionCoder implements 
Coder<SomeRestriction> {}
+  private abstract static class SomeRestrictionCoder extends 
StructuredCoder<SomeRestriction> {}
 
   @Test
   public void testHasRestrictionTracker() throws Exception {

Reply via email to