Converts AvroIO.Read to AutoValue

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/439f2ca0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/439f2ca0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/439f2ca0

Branch: refs/heads/master
Commit: 439f2ca03c0d8994e5736b9493f61d9cb4267cb2
Parents: ff7a1d4
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Apr 28 18:37:49 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 67 +++++++++-----------
 1 file changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/439f2ca0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index ed172d1..2f1d917 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.io.BaseEncoding;
@@ -130,12 +131,15 @@ public class AvroIO {
    * <p>The schema must be specified using one of the {@code withSchema} 
functions.
    */
   public static <T> Read<T> read() {
-    return new Read<>();
+    return new AutoValue_AvroIO_Read.Builder<T>().build();
   }
 
   /** Reads Avro file(s) containing records of the specified schema. */
   public static Read<GenericRecord> readGenericRecords(Schema schema) {
-    return new Read<>(null, null, GenericRecord.class, schema);
+    return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .build();
   }
 
   /**
@@ -146,26 +150,21 @@ public class AvroIO {
   }
 
   /** Implementation of {@link #read}. */
-  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-    /** The filepattern to read from. */
-    @Nullable
-    final String filepattern;
-    /** The class type of the records. */
-    @Nullable
-    final Class<T> type;
-    /** The schema of the input file. */
-    @Nullable
-    final Schema schema;
-
-    Read() {
-      this(null, null, null, null);
-    }
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+    @Nullable abstract String getFilepattern();
+    @Nullable abstract Class<T> getRecordClass();
+    @Nullable abstract Schema getSchema();
+
+    abstract Builder<T> toBuilder();
 
-    Read(String name, String filepattern, Class<T> type, Schema schema) {
-      super(name);
-      this.filepattern = filepattern;
-      this.type = type;
-      this.schema = schema;
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(String filepattern);
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Read<T> build();
     }
 
     /**
@@ -178,7 +177,7 @@ public class AvroIO {
      * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
      */
     public Read<T> from(String filepattern) {
-      return new Read<>(name, filepattern, type, schema);
+      return toBuilder().setFilepattern(filepattern).build();
     }
 
     /**
@@ -187,26 +186,26 @@ public class AvroIO {
      * specified Avro-generated class.
      */
     public Read<T> withSchema(Class<T> type) {
-      return new Read<>(name, filepattern, type, 
ReflectData.get().getSchema(type));
+      return 
toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
     }
 
     @Override
     public PCollection<T> expand(PBegin input) {
-      if (filepattern == null) {
+      if (getFilepattern() == null) {
         throw new IllegalStateException(
             "need to set the filepattern of an AvroIO.Read transform");
       }
-      if (schema == null) {
+      if (getSchema() == null) {
         throw new IllegalStateException("need to set the schema of an 
AvroIO.Read transform");
       }
 
       @SuppressWarnings("unchecked")
       Bounded<T> read =
-          type == GenericRecord.class
+          getRecordClass() == GenericRecord.class
               ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
-                  AvroSource.from(filepattern).withSchema(schema))
+                  AvroSource.from(getFilepattern()).withSchema(getSchema()))
               : org.apache.beam.sdk.io.Read.from(
-                  AvroSource.from(filepattern).withSchema(type));
+                  
AvroSource.from(getFilepattern()).withSchema(getRecordClass()));
 
       PCollection<T> pcol = input.getPipeline().apply("Read", read);
       // Honor the default output coder that would have been used by this 
PTransform.
@@ -218,21 +217,13 @@ public class AvroIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-        .addIfNotNull(DisplayData.item("filePattern", filepattern)
+        .addIfNotNull(DisplayData.item("filePattern", getFilepattern())
           .withLabel("Input File Pattern"));
     }
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return AvroCoder.of(type, schema);
-    }
-
-    public String getFilepattern() {
-      return filepattern;
-    }
-
-    public Schema getSchema() {
-      return schema;
+      return AvroCoder.of(getRecordClass(), getSchema());
     }
   }
 

Reply via email to