Converts AvroIO.Read to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/439f2ca0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/439f2ca0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/439f2ca0 Branch: refs/heads/master Commit: 439f2ca03c0d8994e5736b9493f61d9cb4267cb2 Parents: ff7a1d4 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Apr 28 18:37:49 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 67 +++++++++----------- 1 file changed, 29 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/439f2ca0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index ed172d1..2f1d917 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; +import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.io.BaseEncoding; @@ -130,12 +131,15 @@ public class AvroIO { * <p>The schema must be specified using one of the {@code withSchema} functions. */ public static <T> Read<T> read() { - return new Read<>(); + return new AutoValue_AvroIO_Read.Builder<T>().build(); } /** Reads Avro file(s) containing records of the specified schema. */ public static Read<GenericRecord> readGenericRecords(Schema schema) { - return new Read<>(null, null, GenericRecord.class, schema); + return new AutoValue_AvroIO_Read.Builder<GenericRecord>() + .setRecordClass(GenericRecord.class) + .setSchema(schema) + .build(); } /** @@ -146,26 +150,21 @@ public class AvroIO { } /** Implementation of {@link #read}. */ - public static class Read<T> extends PTransform<PBegin, PCollection<T>> { - /** The filepattern to read from. */ - @Nullable - final String filepattern; - /** The class type of the records. */ - @Nullable - final Class<T> type; - /** The schema of the input file. */ - @Nullable - final Schema schema; - - Read() { - this(null, null, null, null); - } + @AutoValue + public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { + @Nullable abstract String getFilepattern(); + @Nullable abstract Class<T> getRecordClass(); + @Nullable abstract Schema getSchema(); + + abstract Builder<T> toBuilder(); - Read(String name, String filepattern, Class<T> type, Schema schema) { - super(name); - this.filepattern = filepattern; - this.type = type; - this.schema = schema; + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFilepattern(String filepattern); + abstract Builder<T> setRecordClass(Class<T> recordClass); + abstract Builder<T> setSchema(Schema schema); + + abstract Read<T> build(); } /** @@ -178,7 +177,7 @@ public class AvroIO { * Filesystem glob patterns</a> ("*", "?", "[..]") are supported. */ public Read<T> from(String filepattern) { - return new Read<>(name, filepattern, type, schema); + return toBuilder().setFilepattern(filepattern).build(); } /** @@ -187,26 +186,26 @@ public class AvroIO { * specified Avro-generated class. */ public Read<T> withSchema(Class<T> type) { - return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type)); + return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build(); } @Override public PCollection<T> expand(PBegin input) { - if (filepattern == null) { + if (getFilepattern() == null) { throw new IllegalStateException( "need to set the filepattern of an AvroIO.Read transform"); } - if (schema == null) { + if (getSchema() == null) { throw new IllegalStateException("need to set the schema of an AvroIO.Read transform"); } @SuppressWarnings("unchecked") Bounded<T> read = - type == GenericRecord.class + getRecordClass() == GenericRecord.class ? (Bounded<T>) org.apache.beam.sdk.io.Read.from( - AvroSource.from(filepattern).withSchema(schema)) + AvroSource.from(getFilepattern()).withSchema(getSchema())) : org.apache.beam.sdk.io.Read.from( - AvroSource.from(filepattern).withSchema(type)); + AvroSource.from(getFilepattern()).withSchema(getRecordClass())); PCollection<T> pcol = input.getPipeline().apply("Read", read); // Honor the default output coder that would have been used by this PTransform. @@ -218,21 +217,13 @@ public class AvroIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder - .addIfNotNull(DisplayData.item("filePattern", filepattern) + .addIfNotNull(DisplayData.item("filePattern", getFilepattern()) .withLabel("Input File Pattern")); } @Override protected Coder<T> getDefaultOutputCoder() { - return AvroCoder.of(type, schema); - } - - public String getFilepattern() { - return filepattern; - } - - public Schema getSchema() { - return schema; + return AvroCoder.of(getRecordClass(), getSchema()); } }