Repository: beam Updated Branches: refs/heads/master 0b7515e5e -> 063fbd4ed
Add Create#withTypeDescriptor This permits creates that cannot infer a coder based on the input elements to pass a Type Descriptor and use Coder Inference, rather than requiring a coder be provided. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c183a7bd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c183a7bd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c183a7bd Branch: refs/heads/master Commit: c183a7bd172044459e5f0255c447b882d5586409 Parents: 0b7515e Author: Aviem Zur <aviem...@gmail.com> Authored: Fri Feb 24 11:18:02 2017 +0200 Committer: Thomas Groh <tg...@google.com> Committed: Mon Feb 27 09:39:46 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/Create.java | 53 ++++++++++++++++++-- .../apache/beam/sdk/transforms/CreateTest.java | 47 +++++++++++++++++ 2 files changed, 96 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c183a7bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index d683907..4f746d0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -89,6 +89,7 @@ import org.joda.time.Instant; * * @param <T> the type of the elements of the resulting {@code PCollection} */ +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") public class Create<T> { /** * Returns a new {@code Create.Values} transform that produces a @@ -108,7 +109,7 @@ public class Create<T> { * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly. */ public static <T> Values<T> of(Iterable<T> elems) { - return new Values<>(elems, Optional.<Coder<T>>absent()); + return new Values<>(elems, Optional.<Coder<T>>absent(), Optional.<TypeDescriptor<T>>absent()); } /** @@ -148,7 +149,25 @@ public class Create<T> { * Instead, the {@code Coder} is provided via the {@code coder} argument. */ public static <T> Values<T> empty(Coder<T> coder) { - return new Values<>(new ArrayList<T>(), Optional.of(coder)); + return new Values<>(new ArrayList<T>(), Optional.of(coder), + Optional.<TypeDescriptor<T>>absent()); + } + + /** + * Returns a new {@code Create.Values} transform that produces + * an empty {@link PCollection}. + * + * <p>The elements will have a timestamp of negative infinity, see + * {@link Create#timestamped} for a way of creating a {@code PCollection} + * with timestamped elements. + * + * <p>Since there are no elements, the {@code Coder} cannot be automatically determined. + * Instead, the {@code Coder} is determined from given {@code TypeDescriptor<T>}. + * Note that a default coder must be registered for the class described in the + * {@code TypeDescriptor<T>}. + */ + public static <T> Values<T> empty(TypeDescriptor<T> type) { + return new Values<>(new ArrayList<T>(), Optional.<Coder<T>>absent(), Optional.of(type)); } /** @@ -251,7 +270,24 @@ public class Create<T> { * <p>Note that for {@link Create.Values} with no elements, the {@link VoidCoder} is used. */ public Values<T> withCoder(Coder<T> coder) { - return new Values<>(elems, Optional.of(coder)); + return new Values<>(elems, Optional.of(coder), typeDescriptor); + } + + /** + * Returns a {@link Create.Values} PTransform like this one that uses the given + * {@code TypeDescriptor<T>} to determine the {@code Coder} to use to decode each of the + * objects into a value of type {@code T}. Note that a default coder must be registered for the + * class described in the {@code TypeDescriptor<T>}. + * + * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use + * if all elements have the same non-parameterized run-time class, and a default coder is + * registered for that class. See {@link CoderRegistry} for details on how defaults are + * determined. + * + * <p>Note that for {@link Create.Values} with no elements, the {@link VoidCoder} is used. + */ + public Values<T> withType(TypeDescriptor<T> type) { + return new Values<>(elems, coder, Optional.of(type)); } public Iterable<T> getElements() { @@ -279,6 +315,8 @@ public class Create<T> { public Coder<T> getDefaultOutputCoder(PBegin input) throws CannotProvideCoderException { if (coder.isPresent()) { return coder.get(); + } else if (typeDescriptor.isPresent()) { + return input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get()); } else { return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems); } @@ -292,15 +330,22 @@ public class Create<T> { /** The coder used to encode the values to and from a binary representation. */ private final transient Optional<Coder<T>> coder; + /** The value type. */ + private final transient Optional<TypeDescriptor<T>> typeDescriptor; + /** * Constructs a {@code Create.Values} transform that produces a * {@link PCollection} containing the specified elements. * * <p>The arguments should not be modified after this is called. */ - private Values(Iterable<T> elems, Optional<Coder<T>> coder) { + private Values( + Iterable<T> elems, + Optional<Coder<T>> coder, + Optional<TypeDescriptor<T>> typeDescriptor) { this.elems = elems; this.coder = coder; + this.typeDescriptor = typeDescriptor; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/c183a7bd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 93260cd..af917cf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -55,8 +56,10 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create.Values.CreateSource; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptor; import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Rule; @@ -128,6 +131,17 @@ public class CreateTest { static class Record2 extends Record { } + private static class RecordCoder extends CustomCoder<Record> { + @Override + public void encode(Record value, OutputStream outStream, Context context) + throws CoderException, IOException {} + + @Override + public Record decode(InputStream inStream, Context context) throws CoderException, IOException { + return null; + } + } + @Test public void testPolymorphicType() throws Exception { thrown.expect(RuntimeException.class); @@ -316,6 +330,39 @@ public class CreateTest { assertEquals("Create.TimestampedValues", Create.timestamped(Collections.EMPTY_LIST).getName()); } + + @Test + public void testCreateDefaultOutputCoderUsingInference() throws Exception { + Coder<Record> coder = new RecordCoder(); + p.getCoderRegistry().registerCoder(Record.class, coder); + PBegin pBegin = PBegin.in(p); + Create.Values<Record> values = Create.of(new Record(), new Record(), new Record()); + Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin); + assertThat(defaultCoder, equalTo(coder)); + } + + @Test + public void testCreateDefaultOutputCoderUsingCoder() throws Exception { + Coder<Record> coder = new RecordCoder(); + PBegin pBegin = PBegin.in(p); + Create.Values<Record> values = + Create.of(new Record(), new Record(), new Record()).withCoder(coder); + Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin); + assertThat(defaultCoder, equalTo(coder)); + } + + @Test + public void testCreateDefaultOutputCoderUsingTypeDescriptor() throws Exception { + Coder<Record> coder = new RecordCoder(); + p.getCoderRegistry().registerCoder(Record.class, coder); + PBegin pBegin = PBegin.in(p); + Create.Values<Record> values = + Create.of(new Record(), new Record(), new Record()) + .withType(new TypeDescriptor<Record>() {}); + Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin); + assertThat(defaultCoder, equalTo(coder)); + } + @Test public void testSourceIsSerializableWithUnserializableElements() throws Exception { List<UnserializableRecord> elements =