Repository: beam
Updated Branches:
  refs/heads/master 0b7515e5e -> 063fbd4ed


Add Create#withTypeDescriptor

This permits creates that cannot infer a coder based on the input
elements to pass a Type Descriptor and use Coder Inference, rather than
requiring a coder be provided.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c183a7bd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c183a7bd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c183a7bd

Branch: refs/heads/master
Commit: c183a7bd172044459e5f0255c447b882d5586409
Parents: 0b7515e
Author: Aviem Zur <aviem...@gmail.com>
Authored: Fri Feb 24 11:18:02 2017 +0200
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Feb 27 09:39:46 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Create.java  | 53 ++++++++++++++++++--
 .../apache/beam/sdk/transforms/CreateTest.java  | 47 +++++++++++++++++
 2 files changed, 96 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c183a7bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index d683907..4f746d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -89,6 +89,7 @@ import org.joda.time.Instant;
  *
  * @param <T> the type of the elements of the resulting {@code PCollection}
  */
+@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
 public class Create<T> {
   /**
    * Returns a new {@code Create.Values} transform that produces a
@@ -108,7 +109,7 @@ public class Create<T> {
    * Otherwise, use {@link Create.Values#withCoder} to set the coder 
explicitly.
    */
   public static <T> Values<T> of(Iterable<T> elems) {
-    return new Values<>(elems, Optional.<Coder<T>>absent());
+    return new Values<>(elems, Optional.<Coder<T>>absent(), 
Optional.<TypeDescriptor<T>>absent());
   }
 
   /**
@@ -148,7 +149,25 @@ public class Create<T> {
    * Instead, the {@code Coder} is provided via the {@code coder} argument.
    */
   public static <T> Values<T> empty(Coder<T> coder) {
-    return new Values<>(new ArrayList<T>(), Optional.of(coder));
+    return new Values<>(new ArrayList<T>(), Optional.of(coder),
+        Optional.<TypeDescriptor<T>>absent());
+  }
+
+  /**
+   * Returns a new {@code Create.Values} transform that produces
+   * an empty {@link PCollection}.
+   *
+   * <p>The elements will have a timestamp of negative infinity, see
+   * {@link Create#timestamped} for a way of creating a {@code PCollection}
+   * with timestamped elements.
+   *
+   * <p>Since there are no elements, the {@code Coder} cannot be automatically 
determined.
+   * Instead, the {@code Coder} is determined from given {@code 
TypeDescriptor<T>}.
+   * Note that a default coder must be registered for the class described in 
the
+   * {@code TypeDescriptor<T>}.
+   */
+  public static <T> Values<T> empty(TypeDescriptor<T> type) {
+    return new Values<>(new ArrayList<T>(), Optional.<Coder<T>>absent(), 
Optional.of(type));
   }
 
   /**
@@ -251,7 +270,24 @@ public class Create<T> {
      * <p>Note that for {@link Create.Values} with no elements, the {@link 
VoidCoder} is used.
      */
     public Values<T> withCoder(Coder<T> coder) {
-      return new Values<>(elems, Optional.of(coder));
+      return new Values<>(elems, Optional.of(coder), typeDescriptor);
+    }
+
+    /**
+     * Returns a {@link Create.Values} PTransform like this one that uses the 
given
+     * {@code TypeDescriptor<T>} to determine the {@code Coder} to use to 
decode each of the
+     * objects into a value of type {@code T}. Note that a default coder must 
be registered for the
+     * class described in the {@code TypeDescriptor<T>}.
+     *
+     * <p>By default, {@code Create.Values} can automatically determine the 
{@code Coder} to use
+     * if all elements have the same non-parameterized run-time class, and a 
default coder is
+     * registered for that class. See {@link CoderRegistry} for details on how 
defaults are
+     * determined.
+     *
+     * <p>Note that for {@link Create.Values} with no elements, the {@link 
VoidCoder} is used.
+     */
+    public Values<T> withType(TypeDescriptor<T> type) {
+      return new Values<>(elems, coder, Optional.of(type));
     }
 
     public Iterable<T> getElements() {
@@ -279,6 +315,8 @@ public class Create<T> {
     public Coder<T> getDefaultOutputCoder(PBegin input) throws 
CannotProvideCoderException {
       if (coder.isPresent()) {
         return coder.get();
+      } else if (typeDescriptor.isPresent()) {
+        return 
input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get());
       } else {
         return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), 
elems);
       }
@@ -292,15 +330,22 @@ public class Create<T> {
     /** The coder used to encode the values to and from a binary 
representation. */
     private final transient Optional<Coder<T>> coder;
 
+    /** The value type. */
+    private final transient Optional<TypeDescriptor<T>> typeDescriptor;
+
     /**
      * Constructs a {@code Create.Values} transform that produces a
      * {@link PCollection} containing the specified elements.
      *
      * <p>The arguments should not be modified after this is called.
      */
-    private Values(Iterable<T> elems, Optional<Coder<T>> coder) {
+    private Values(
+        Iterable<T> elems,
+        Optional<Coder<T>> coder,
+        Optional<TypeDescriptor<T>> typeDescriptor) {
       this.elems = elems;
       this.coder = coder;
+      this.typeDescriptor = typeDescriptor;
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/c183a7bd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 93260cd..af917cf 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -55,8 +56,10 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -128,6 +131,17 @@ public class CreateTest {
   static class Record2 extends Record {
   }
 
+  private static class RecordCoder extends CustomCoder<Record> {
+    @Override
+    public void encode(Record value, OutputStream outStream, Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(InputStream inStream, Context context) throws 
CoderException, IOException {
+      return null;
+    }
+  }
+
   @Test
   public void testPolymorphicType() throws Exception {
     thrown.expect(RuntimeException.class);
@@ -316,6 +330,39 @@ public class CreateTest {
     assertEquals("Create.TimestampedValues", 
Create.timestamped(Collections.EMPTY_LIST).getName());
   }
 
+
+  @Test
+  public void testCreateDefaultOutputCoderUsingInference() throws Exception {
+    Coder<Record> coder = new RecordCoder();
+    p.getCoderRegistry().registerCoder(Record.class, coder);
+    PBegin pBegin = PBegin.in(p);
+    Create.Values<Record> values = Create.of(new Record(), new Record(), new 
Record());
+    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
+    assertThat(defaultCoder, equalTo(coder));
+  }
+
+  @Test
+  public void testCreateDefaultOutputCoderUsingCoder() throws Exception {
+    Coder<Record> coder = new RecordCoder();
+    PBegin pBegin = PBegin.in(p);
+    Create.Values<Record> values =
+        Create.of(new Record(), new Record(), new Record()).withCoder(coder);
+    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
+    assertThat(defaultCoder, equalTo(coder));
+  }
+
+  @Test
+  public void testCreateDefaultOutputCoderUsingTypeDescriptor() throws 
Exception {
+    Coder<Record> coder = new RecordCoder();
+    p.getCoderRegistry().registerCoder(Record.class, coder);
+    PBegin pBegin = PBegin.in(p);
+    Create.Values<Record> values =
+        Create.of(new Record(), new Record(), new Record())
+            .withType(new TypeDescriptor<Record>() {});
+    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
+    assertThat(defaultCoder, equalTo(coder));
+  }
+
   @Test
   public void testSourceIsSerializableWithUnserializableElements() throws 
Exception {
     List<UnserializableRecord> elements =

Reply via email to