Converts AvroIO.Write to AutoValue; adds writeGenericRecords()

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0d74750
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0d74750
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0d74750

Branch: refs/heads/master
Commit: e0d74750da73658a067e7522f18c23c5e622fb2f
Parents: abb4916
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Apr 28 19:21:15 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 355 +++++--------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  19 +-
 .../apache/beam/sdk/io/AvroIOTransformTest.java |   4 +-
 4 files changed, 105 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index c58d81e..7188dc5 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -75,7 +75,7 @@ public class AvroPipelineTest {
     Pipeline p = pipelineRule.createPipeline();
     PCollection<GenericRecord> input = p.apply(
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
-    
input.apply(AvroIO.write().to(outputDir.getAbsolutePath()).withSchema(schema));
+    
input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath()));
     p.run().waitUntilFinish();
 
     List<GenericRecord> records = readGenericFile();

http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 08fc8a9..8cdd4e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -154,7 +154,30 @@ public class AvroIO {
    * pattern).
    */
   public static <T> Write<T> write() {
-    return new Write<>(null);
+    return new AutoValue_AvroIO_Write.Builder<T>()
+        .setFilenameSuffix("")
+        .setNumShards(0)
+        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setCodec(Write.DEFAULT_CODEC)
+        .setMetadata(ImmutableMap.<String, Object>of())
+        .setWindowedWrites(false)
+        .build();
+  }
+
+  /** Writes Avro records of the specified schema. */
+  public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+    return AvroIO.<GenericRecord>write()
+        .toBuilder()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .build();
+  }
+
+  /**
+   * Like {@link #writeGenericRecords(Schema)} but the schema is specified as 
a JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
   }
 
   /** Implementation of {@link #read}. */
@@ -229,7 +252,8 @@ public class AvroIO {
   /////////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
-  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
     /**
      * A {@link PTransform} that writes a bounded {@link PCollection} to an 
Avro file (or
      * multiple Avro files matching a sharding pattern).
@@ -242,80 +266,38 @@ public class AvroIO {
     // This should be a multiple of 4 to not get a partial encoded byte.
     private static final int METADATA_BYTES_MAX_LENGTH = 40;
 
-    /** The filename to write to. */
-    @Nullable
-    final String filenamePrefix;
-    /** Suffix to use for each filename. */
-    final String filenameSuffix;
-    /** Requested number of shards. 0 for automatic. */
-    final int numShards;
-    /** Shard template string. */
-    final String shardTemplate;
-    /** The class type of the records. */
-    final Class<T> type;
-    /** The schema of the output file. */
-    @Nullable
-    final Schema schema;
-    final boolean windowedWrites;
-    FileBasedSink.FilenamePolicy filenamePolicy;
-
+    @Nullable abstract String getFilenamePrefix();
+    abstract String getFilenameSuffix();
+    abstract int getNumShards();
+    abstract String getShardTemplate();
+    abstract Class<T> getRecordClass();
+    @Nullable abstract Schema getSchema();
+    abstract boolean getWindowedWrites();
+    @Nullable abstract FileBasedSink.FilenamePolicy getFilenamePolicy();
     /**
      * The codec used to encode the blocks in the Avro file. String value 
drawn from those in
      * 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
      */
-    final SerializableAvroCodecFactory codec;
+    abstract SerializableAvroCodecFactory getCodec();
     /** Avro file metadata. */
-    final ImmutableMap<String, Object> metadata;
-
-    Write(Class<T> type) {
-      this(
-          null,
-          null,
-          "",
-          0,
-          DEFAULT_SHARD_TEMPLATE,
-          type,
-          null,
-          DEFAULT_CODEC,
-          ImmutableMap.<String, Object>of(),
-          false,
-          null);
-    }
+    abstract ImmutableMap<String, Object> getMetadata();
 
-    Write(
-        String name,
-        String filenamePrefix,
-        String filenameSuffix,
-        int numShards,
-        String shardTemplate,
-        Class<T> type,
-        Schema schema,
-        SerializableAvroCodecFactory codec,
-        Map<String, Object> metadata,
-        boolean windowedWrites,
-        FileBasedSink.FilenamePolicy filenamePolicy) {
-      super(name);
-      this.filenamePrefix = filenamePrefix;
-      this.filenameSuffix = filenameSuffix;
-      this.numShards = numShards;
-      this.shardTemplate = shardTemplate;
-      this.type = type;
-      this.schema = schema;
-      this.codec = codec;
-      this.windowedWrites = windowedWrites;
-      this.filenamePolicy = filenamePolicy;
+    abstract Builder<T> toBuilder();
 
-      Map<String, String> badKeys = Maps.newLinkedHashMap();
-      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
-        Object v = entry.getValue();
-        if (!(v instanceof String || v instanceof Long || v instanceof 
byte[])) {
-          badKeys.put(entry.getKey(), v.getClass().getSimpleName());
-        }
-      }
-      checkArgument(
-          badKeys.isEmpty(),
-          "Metadata value type must be one of String, Long, or byte[]. Found 
{}", badKeys);
-      this.metadata = ImmutableMap.copyOf(metadata);
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilenamePrefix(String filenamePrefix);
+      abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+      abstract Builder<T> setNumShards(int numShards);
+      abstract Builder<T> setShardTemplate(String shardTemplate);
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+      abstract Builder<T> setSchema(Schema schema);
+      abstract Builder<T> setWindowedWrites(boolean windowedWrites);
+      abstract Builder<T> setFilenamePolicy(FileBasedSink.FilenamePolicy 
filenamePolicy);
+      abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
+      abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
+
+      abstract Write<T> build();
     }
 
     /**
@@ -330,34 +312,12 @@ public class AvroIO {
      */
     public Write<T> to(String filenamePrefix) {
       validateOutputComponent(filenamePrefix);
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
     /** Writes to the file(s) specified by the provided {@link 
FileBasedSink.FilenamePolicy}. */
     public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
     /**
@@ -367,18 +327,7 @@ public class AvroIO {
      */
     public Write<T> withSuffix(String filenameSuffix) {
       validateOutputComponent(filenameSuffix);
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
     /**
@@ -394,18 +343,7 @@ public class AvroIO {
      */
     public Write<T> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setNumShards(numShards).build();
     }
 
     /**
@@ -415,18 +353,7 @@ public class AvroIO {
      * @see ShardNameTemplate
      */
     public Write<T> withShardNameTemplate(String shardTemplate) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
     /**
@@ -439,76 +366,19 @@ public class AvroIO {
     }
 
     public Write<T> withWindowedWrites() {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          true,
-          filenamePolicy);
+      return toBuilder().setWindowedWrites(true).build();
     }
 
     /**
      * Writes to Avro file(s) containing records whose type is the specified 
Avro-generated class.
      */
     public Write<T> withSchema(Class<T> type) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          ReflectData.get().getSchema(type),
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
-    }
-
-    /** Writes to Avro file(s) containing records of the specified schema. */
-    public Write<GenericRecord> withSchema(Schema schema) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          GenericRecord.class,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
-    }
-
-    /**
-     * Writes to Avro file(s) containing records of the specified schema in a 
JSON-encoded string
-     * form.
-     */
-    public Write<GenericRecord> withSchema(String schema) {
-      return withSchema((new Schema.Parser()).parse(schema));
+      return 
toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
     }
 
     /** Writes to Avro file(s) compressed using specified codec. */
     public Write<T> withCodec(CodecFactory codec) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          new SerializableAvroCodecFactory(codec),
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setCodec(new 
SerializableAvroCodecFactory(codec)).build();
     }
 
     /**
@@ -517,56 +387,56 @@ public class AvroIO {
      * <p>Supported value types are String, Long, and byte[].
      */
     public Write<T> withMetadata(Map<String, Object> metadata) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      Map<String, String> badKeys = Maps.newLinkedHashMap();
+      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        Object v = entry.getValue();
+        if (!(v instanceof String || v instanceof Long || v instanceof 
byte[])) {
+          badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+        }
+      }
+      checkArgument(
+          badKeys.isEmpty(),
+          "Metadata value type must be one of String, Long, or byte[]. Found 
{}",
+          badKeys);
+      return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
     }
 
     @Override
     public PDone expand(PCollection<T> input) {
-      if (filenamePolicy == null && filenamePrefix == null) {
+      if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
         throw new IllegalStateException(
             "need to set the filename prefix of an AvroIO.Write transform");
       }
-      if (filenamePolicy != null && filenamePrefix != null) {
+      if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
         throw new IllegalStateException(
             "cannot set both a filename policy and a filename prefix");
       }
-      if (schema == null) {
+      if (getSchema() == null) {
         throw new IllegalStateException("need to set the schema of an 
AvroIO.Write transform");
       }
 
       WriteFiles<T> write = null;
-      if (filenamePolicy != null) {
+      if (getFilenamePolicy() != null) {
         write = WriteFiles.to(
             new AvroSink<>(
-                filenamePolicy,
-                AvroCoder.of(type, schema),
-                codec,
-                metadata));
+                getFilenamePolicy(),
+                AvroCoder.of(getRecordClass(), getSchema()),
+                getCodec(),
+                getMetadata()));
       } else {
         write = WriteFiles.to(
             new AvroSink<>(
-                filenamePrefix,
-                filenameSuffix,
-                shardTemplate,
-                AvroCoder.of(type, schema),
-                codec,
-                metadata));
+                getFilenamePrefix(),
+                getFilenameSuffix(),
+                getShardTemplate(),
+                AvroCoder.of(getRecordClass(), getSchema()),
+                getCodec(),
+                getMetadata()));
       }
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
-      if (windowedWrites) {
+      if (getWindowedWrites()) {
         write = write.withWindowedWrites();
       }
       return input.apply("Write", write);
@@ -576,20 +446,20 @@ public class AvroIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-          .add(DisplayData.item("schema", type)
+          .add(DisplayData.item("schema", getRecordClass())
             .withLabel("Record Schema"))
-          .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
+          .addIfNotNull(DisplayData.item("filePrefix", getFilenamePrefix())
             .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", 
getShardTemplate())
               .withLabel("Output Shard Name Template"),
               DEFAULT_SHARD_TEMPLATE)
-          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
               .withLabel("Output File Suffix"),
               "")
-          .addIfNotDefault(DisplayData.item("numShards", numShards)
+          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
               .withLabel("Maximum Output Shards"),
               0)
-          .addIfNotDefault(DisplayData.item("codec", codec.toString())
+          .addIfNotDefault(DisplayData.item("codec", getCodec().toString())
               .withLabel("Avro Compression Codec"),
               DEFAULT_CODEC.toString());
       builder.include("Metadata", new Metadata());
@@ -598,7 +468,7 @@ public class AvroIO {
     private class Metadata implements HasDisplayData {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        for (Map.Entry<String, Object> entry : getMetadata().entrySet()) {
           DisplayData.Type type = DisplayData.inferType(entry.getValue());
           if (type != null) {
             builder.add(DisplayData.item(entry.getKey(), type, 
entry.getValue()));
@@ -612,49 +482,10 @@ public class AvroIO {
       }
     }
 
-    /**
-     * Returns the current shard name template string.
-     */
-    public String getShardNameTemplate() {
-      return shardTemplate;
-    }
-
     @Override
     protected Coder<Void> getDefaultOutputCoder() {
       return VoidCoder.of();
     }
-
-    public String getFilenamePrefix() {
-      return filenamePrefix;
-    }
-
-    public String getShardTemplate() {
-      return shardTemplate;
-    }
-
-    public int getNumShards() {
-      return numShards;
-    }
-
-    public String getFilenameSuffix() {
-      return filenameSuffix;
-    }
-
-    public Class<T> getType() {
-      return type;
-    }
-
-    public Schema getSchema() {
-      return schema;
-    }
-
-    public CodecFactory getCodec() {
-      return codec.getCodec();
-    }
-
-    public Map<String, Object> getMetadata() {
-      return metadata;
-    }
   }
 
   // Pattern which matches old-style shard output patterns, which are now

http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 38984b5..4abd3e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -422,9 +422,9 @@ public class AvroIOTest {
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.deflateCodec(9));
 
-    AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
-
-    assertEquals(CodecFactory.deflateCodec(9).toString(), 
serdeWrite.getCodec().toString());
+    assertEquals(
+        CodecFactory.deflateCodec(9).toString(),
+        SerializableUtils.clone(write.getCodec()).getCodec().toString());
   }
 
   @Test
@@ -434,9 +434,9 @@ public class AvroIOTest {
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.xzCodec(9));
 
-    AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
-
-    assertEquals(CodecFactory.xzCodec(9).toString(), 
serdeWrite.getCodec().toString());
+    assertEquals(
+        CodecFactory.xzCodec(9).toString(),
+        SerializableUtils.clone(write.getCodec()).getCodec().toString());
   }
 
   @Test
@@ -482,7 +482,7 @@ public class AvroIOTest {
     p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
     p.run();
 
-    String shardNameTemplate = write.getShardNameTemplate();
+    String shardNameTemplate = write.getShardTemplate();
 
     assertTestOutputs(expectedElements, numShards, outputFilePrefix, 
shardNameTemplate);
   }
@@ -580,9 +580,8 @@ public class AvroIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    AvroIO.Write<?> write = AvroIO.<GenericRecord>write()
-        .to(outputPath)
-        .withSchema(Schema.create(Schema.Type.STRING));
+    AvroIO.Write<?> write = 
AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING))
+        .to(outputPath);
 
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("AvroIO.Write should include the file pattern in its primitive 
transform",

http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 51c9691..fb57d5c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -275,12 +275,12 @@ public class AvroIOTransformTest {
                       generatedClass
                   },
                   new Object[] {
-                      AvroIO.write().withSchema(SCHEMA),
+                      AvroIO.writeGenericRecords(SCHEMA),
                       fromSchema
                   },
 
                   new Object[] {
-                      AvroIO.write().withSchema(SCHEMA_STRING),
+                      AvroIO.writeGenericRecords(SCHEMA_STRING),
                       fromSchemaString
                   })
               .build();

Reply via email to