Removes AvroIO.Write.Bound

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1dfd4e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1dfd4e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1dfd4e2

Branch: refs/heads/master
Commit: d1dfd4e2a8b82451f28f1f0e6f261eae0d51bb5b
Parents: 439f2ca
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Apr 28 18:59:03 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 910 ++++++++-----------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  37 +-
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  12 +-
 4 files changed, 385 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 62db14f..c58d81e 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -75,7 +75,7 @@ public class AvroPipelineTest {
     Pipeline p = pipelineRule.createPipeline();
     PCollection<GenericRecord> input = p.apply(
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
-    
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
+    
input.apply(AvroIO.write().to(outputDir.getAbsolutePath()).withSchema(schema));
     p.run().waitUntilFinish();
 
     List<GenericRecord> records = readGenericFile();

http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 2f1d917..4bde6ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -70,24 +70,24 @@ import org.apache.beam.sdk.values.PDone;
  * // A Read from a GCS file (runs locally and using remote execution):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records =
- *     p.apply(AvroIO.Read
+ *     p.apply(AvroIO.read()
  *                .from("gs://my_bucket/path/to/records-*.avro")
  *                .withSchema(schema));
  * } </pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to(String)} to specify
+ * {@link AvroIO.Write}, specifying {@code AvroIO.write().to(String)} to 
specify
  * the path of the file to write to (e.g., a local filename or sharded
  * filename pattern if running locally, or a Google Cloud Storage
  * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). {@link 
AvroIO.Write#to(FileBasedSink.FilenamePolicy)}
+ * {@code "gs://<bucket>/<filepath>"}). {@code 
AvroIO.write().to(FileBasedSink.FilenamePolicy)}
  * can also be used to specify a custom file naming policy.
  *
  * <p>By default, all input is put into the global window before writing. If 
per-window writes are
  * desired - for example, when using a streaming runner -
- * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and 
triggering to be
+ * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and 
triggering to be
  * preserved. When producing windowed writes, the number of output shards must 
be set explicitly
- * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set 
this for you to a
+ * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this 
for you to a
  * runner-chosen value, so you may need not set it yourself. A
  * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and 
triggers must produce
  * unique filenames.
@@ -103,13 +103,13 @@ import org.apache.beam.sdk.values.PDone;
  * <pre> {@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records = ...;
- * records.apply(AvroIO.Write.to("/path/to/file.avro")
+ * records.apply(AvroIO.write().to("/path/to/file.avro")
  *                           .withSchema(AvroAutoGenClass.class));
  *
  * // A Write to a sharded GCS file (runs locally and using remote execution):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records = ...;
- * records.apply("WriteToAvro", AvroIO.Write
+ * records.apply("WriteToAvro", AvroIO.write()
  *     .to("gs://my_bucket/path/to/numbers")
  *     .withSchema(schema)
  *     .withSuffix(".avro"));
@@ -149,6 +149,14 @@ public class AvroIO {
     return readGenericRecords(new Schema.Parser().parse(schema));
   }
 
+  /**
+   * Writes a {@link PCollection} to an Avro file (or multiple Avro files 
matching a sharding
+   * pattern).
+   */
+  public static <T> Write<T> write() {
+    return new Write<>(null);
+  }
+
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
@@ -229,45 +237,161 @@ public class AvroIO {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A root {@link PTransform} that writes a {@link PCollection} to an Avro 
file (or
-   * multiple Avro files matching a sharding pattern).
-   */
-  public static class Write {
+  /** Implementation of {@link #write}. */
+  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    /**
+     * A {@link PTransform} that writes a bounded {@link PCollection} to an 
Avro file (or
+     * multiple Avro files matching a sharding pattern).
+     *
+     * @param <T> the type of each of the elements of the input PCollection
+     */
+    private static final String DEFAULT_SHARD_TEMPLATE = 
ShardNameTemplate.INDEX_OF_MAX;
+    private static final SerializableAvroCodecFactory DEFAULT_CODEC =
+        new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
+    // This should be a multiple of 4 to not get a partial encoded byte.
+    private static final int METADATA_BYTES_MAX_LENGTH = 40;
+
+    /** The filename to write to. */
+    @Nullable
+    final String filenamePrefix;
+    /** Suffix to use for each filename. */
+    final String filenameSuffix;
+    /** Requested number of shards. 0 for automatic. */
+    final int numShards;
+    /** Shard template string. */
+    final String shardTemplate;
+    /** The class type of the records. */
+    final Class<T> type;
+    /** The schema of the output file. */
+    @Nullable
+    final Schema schema;
+    final boolean windowedWrites;
+    FileBasedSink.FilenamePolicy filenamePolicy;
 
     /**
-     * Returns a {@link PTransform} that writes to the file(s)
-     * with the given prefix. This can be a local filename
+     * The codec used to encode the blocks in the Avro file. String value 
drawn from those in
+     * 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
+     */
+    final SerializableAvroCodecFactory codec;
+    /** Avro file metadata. */
+    final ImmutableMap<String, Object> metadata;
+
+    Write(Class<T> type) {
+      this(
+          null,
+          null,
+          "",
+          0,
+          DEFAULT_SHARD_TEMPLATE,
+          type,
+          null,
+          DEFAULT_CODEC,
+          ImmutableMap.<String, Object>of(),
+          false,
+          null);
+    }
+
+    Write(
+        String name,
+        String filenamePrefix,
+        String filenameSuffix,
+        int numShards,
+        String shardTemplate,
+        Class<T> type,
+        Schema schema,
+        SerializableAvroCodecFactory codec,
+        Map<String, Object> metadata,
+        boolean windowedWrites,
+        FileBasedSink.FilenamePolicy filenamePolicy) {
+      super(name);
+      this.filenamePrefix = filenamePrefix;
+      this.filenameSuffix = filenameSuffix;
+      this.numShards = numShards;
+      this.shardTemplate = shardTemplate;
+      this.type = type;
+      this.schema = schema;
+      this.codec = codec;
+      this.windowedWrites = windowedWrites;
+      this.filenamePolicy = filenamePolicy;
+
+      Map<String, String> badKeys = Maps.newLinkedHashMap();
+      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        Object v = entry.getValue();
+        if (!(v instanceof String || v instanceof Long || v instanceof 
byte[])) {
+          badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+        }
+      }
+      checkArgument(
+          badKeys.isEmpty(),
+          "Metadata value type must be one of String, Long, or byte[]. Found 
{}", badKeys);
+      this.metadata = ImmutableMap.copyOf(metadata);
+    }
+
+    /**
+     * Writes to the file(s) with the given prefix. This can be a local 
filename
      * (if running locally), or a Google Cloud Storage filename of
      * the form {@code "gs://<bucket>/<filepath>"}
      * (if running locally or using remote execution).
      *
      * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link Bound#withNumShards}, and end
-     * in a common extension, if given by {@link Bound#withSuffix}.
+     * a shard identifier (see {@link #withNumShards}, and end
+     * in a common extension, if given by {@link #withSuffix}.
      */
-    public static Bound<GenericRecord> to(String prefix) {
-      return new Bound<>(GenericRecord.class).to(prefix);
+    public Write<T> to(String filenamePrefix) {
+      validateOutputComponent(filenamePrefix);
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
-    /**
-     * Returns a {@link PTransform} that writes to the file(s) specified by 
the provided
-     * {@link FileBasedSink.FilenamePolicy}.
-     */
-    public static Bound<GenericRecord> to(FileBasedSink.FilenamePolicy 
filenamePolicy) {
-      return new Bound<>(GenericRecord.class).to(filenamePolicy);
+    /** Writes to the file(s) specified by the provided {@link 
FileBasedSink.FilenamePolicy}. */
+    public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that writes to the file(s) with the
-     * given filename suffix.
+     * Writes to the file(s) with the given filename suffix.
+     *
+     * <p>See {@link ShardNameTemplate} for a description of shard templates.
      */
-    public static Bound<GenericRecord> withSuffix(String filenameSuffix) {
-      return new Bound<>(GenericRecord.class).withSuffix(filenameSuffix);
+    public Write<T> withSuffix(String filenameSuffix) {
+      validateOutputComponent(filenameSuffix);
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that uses the provided shard count.
+     * Uses the provided shard count.
      *
      * <p>Constraining the number of shards is likely to reduce
      * the performance of a pipeline. Setting this value is not recommended
@@ -275,585 +399,271 @@ public class AvroIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system
      *                  decide.
+     * @see ShardNameTemplate
      */
-    public static Bound<GenericRecord> withNumShards(int numShards) {
-      return new Bound<>(GenericRecord.class).withNumShards(numShards);
+    public Write<T> withNumShards(int numShards) {
+      checkArgument(numShards >= 0);
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that uses the given shard name
-     * template.
+     * Returns a new {@link PTransform} that's like this one but
+     * that uses the given shard name template.
      *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
+     * @see ShardNameTemplate
      */
-    public static Bound<GenericRecord> withShardNameTemplate(String 
shardTemplate) {
-      return new 
Bound<>(GenericRecord.class).withShardNameTemplate(shardTemplate);
+    public Write<T> withShardNameTemplate(String shardTemplate) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that forces a single file as
-     * output.
+     * Forces a single file as output.
      *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Setting this value is not recommended
-     * unless you require a specific number of output files.
+     * <p>This is a shortcut for {@code 
.withNumShards(1).withShardNameTemplate("")}
      */
-    public static Bound<GenericRecord> withoutSharding() {
-      return new Bound<>(GenericRecord.class).withoutSharding();
+    public Write<T> withoutSharding() {
+      return withNumShards(1).withShardNameTemplate("");
     }
 
-    /**
-     * Returns a {@link PTransform} that writes Avro file(s)
-     * containing records whose type is the specified Avro-generated class.
-     *
-     * @param <T> the type of the elements of the input PCollection
-     */
-    public static <T> Bound<T> withSchema(Class<T> type) {
-      return new Bound<>(type).withSchema(type);
+    public Write<T> withWindowedWrites() {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          true,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that writes Avro file(s)
-     * containing records of the specified schema.
+     * Writes to Avro file(s) containing records whose type is the specified 
Avro-generated class.
      */
-    public static Bound<GenericRecord> withSchema(Schema schema) {
-      return new Bound<>(GenericRecord.class).withSchema(schema);
+    public Write<T> withSchema(Class<T> type) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          ReflectData.get().getSchema(type),
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
-    /**
-     * Returns a {@link PTransform} that writes Avro file(s)
-     * containing records of the specified schema in a JSON-encoded
-     * string form.
-     */
-    public static Bound<GenericRecord> withSchema(String schema) {
-      return withSchema((new Schema.Parser()).parse(schema));
+    /** Writes to Avro file(s) containing records of the specified schema. */
+    public Write<GenericRecord> withSchema(Schema schema) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          GenericRecord.class,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that writes Avro file(s) that has GCS path 
validation on
-     * pipeline creation disabled.
-     *
-     * <p>This can be useful in the case where the GCS output location does
-     * not exist at the pipeline creation time, but is expected to be available
-     * at execution time.
+     * Writes to Avro file(s) containing records of the specified schema in a 
JSON-encoded string
+     * form.
      */
-    public static Bound<GenericRecord> withoutValidation() {
-      return new Bound<>(GenericRecord.class).withoutValidation();
+    public Write<GenericRecord> withSchema(String schema) {
+      return withSchema((new Schema.Parser()).parse(schema));
     }
 
-    /**
-     * Returns a {@link PTransform} that writes Avro file(s) using specified 
codec.
-     */
-    public static Bound<GenericRecord> withCodec(CodecFactory codec) {
-      return new Bound<>(GenericRecord.class).withCodec(codec);
+    /** Writes to Avro file(s) compressed using specified codec. */
+    public Write<T> withCodec(CodecFactory codec) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          new SerializableAvroCodecFactory(codec),
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that writes Avro file(s) with the 
specified metadata.
+     * Writes to Avro file(s) with the specified metadata.
      *
      * <p>Supported value types are String, Long, and byte[].
      */
-    public static Bound<GenericRecord> withMetadata(Map<String, Object> 
metadata) {
-      return new Bound<>(GenericRecord.class).withMetadata(metadata);
+    public Write<T> withMetadata(Map<String, Object> metadata) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
-    /**
-     * A {@link PTransform} that writes a bounded {@link PCollection} to an 
Avro file (or
-     * multiple Avro files matching a sharding pattern).
-     *
-     * @param <T> the type of each of the elements of the input PCollection
-     */
-    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-      private static final String DEFAULT_SHARD_TEMPLATE = 
ShardNameTemplate.INDEX_OF_MAX;
-      private static final SerializableAvroCodecFactory DEFAULT_CODEC =
-          new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
-      // This should be a multiple of 4 to not get a partial encoded byte.
-      private static final int METADATA_BYTES_MAX_LENGTH = 40;
-
-      /** The filename to write to. */
-      @Nullable
-      final String filenamePrefix;
-      /** Suffix to use for each filename. */
-      final String filenameSuffix;
-      /** Requested number of shards. 0 for automatic. */
-      final int numShards;
-      /** Shard template string. */
-      final String shardTemplate;
-      /** The class type of the records. */
-      final Class<T> type;
-      /** The schema of the output file. */
-      @Nullable
-      final Schema schema;
-      final boolean windowedWrites;
-      FileBasedSink.FilenamePolicy filenamePolicy;
-
-      /**
-       * The codec used to encode the blocks in the Avro file. String value 
drawn from those in
-       * 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
-       */
-      final SerializableAvroCodecFactory codec;
-      /** Avro file metadata. */
-      final ImmutableMap<String, Object> metadata;
-
-      Bound(Class<T> type) {
-        this(
-            null,
-            null,
-            "",
-            0,
-            DEFAULT_SHARD_TEMPLATE,
-            type,
-            null,
-            DEFAULT_CODEC,
-            ImmutableMap.<String, Object>of(),
-            false,
-            null);
-      }
-
-      Bound(
-          String name,
-          String filenamePrefix,
-          String filenameSuffix,
-          int numShards,
-          String shardTemplate,
-          Class<T> type,
-          Schema schema,
-          SerializableAvroCodecFactory codec,
-          Map<String, Object> metadata,
-          boolean windowedWrites,
-          FileBasedSink.FilenamePolicy filenamePolicy) {
-        super(name);
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.type = type;
-        this.schema = schema;
-        this.codec = codec;
-        this.windowedWrites = windowedWrites;
-        this.filenamePolicy = filenamePolicy;
-
-        Map<String, String> badKeys = Maps.newLinkedHashMap();
-        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
-          Object v = entry.getValue();
-          if (!(v instanceof String || v instanceof Long || v instanceof 
byte[])) {
-            badKeys.put(entry.getKey(), v.getClass().getSimpleName());
-          }
-        }
-        checkArgument(
-            badKeys.isEmpty(),
-            "Metadata value type must be one of String, Long, or byte[]. Found 
{}", badKeys);
-        this.metadata = ImmutableMap.copyOf(metadata);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to the file(s) with the given filename prefix.
-       *
-       * <p>See {@link AvroIO.Write#to(String)} for more information
-       * about filenames.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> to(String filenamePrefix) {
-        validateOutputComponent(filenamePrefix);
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      public Bound<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to the file(s) with the given filename suffix.
-       *
-       * <p>See {@link ShardNameTemplate} for a description of shard templates.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withSuffix(String filenameSuffix) {
-        validateOutputComponent(filenameSuffix);
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withNumShards(int numShards) {
-        checkArgument(numShards >= 0);
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that uses the given shard name template.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withShardNameTemplate(String shardTemplate) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withoutSharding() {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            1,
-            "",
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      public Bound<T> withWindowedWrites() {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            true,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) containing records whose type is the
-       * specified Avro-generated class.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the elements of the input PCollection
-       */
-      public <X> Bound<X> withSchema(Class<X> type) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            ReflectData.get().getSchema(type),
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+    @Override
+    public PDone expand(PCollection<T> input) {
+      if (filenamePolicy == null && filenamePrefix == null) {
+        throw new IllegalStateException(
+            "need to set the filename prefix of an AvroIO.Write transform");
       }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) containing records of the specified
-       * schema.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<GenericRecord> withSchema(Schema schema) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            GenericRecord.class,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+      if (filenamePolicy != null && filenamePrefix != null) {
+        throw new IllegalStateException(
+            "cannot set both a filename policy and a filename prefix");
       }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) containing records of the specified
-       * schema in a JSON-encoded string form.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<GenericRecord> withSchema(String schema) {
-        return withSchema((new Schema.Parser()).parse(schema));
+      if (schema == null) {
+        throw new IllegalStateException("need to set the schema of an 
AvroIO.Write transform");
       }
 
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that has GCS output path validation on pipeline creation disabled.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>This can be useful in the case where the GCS output location does
-       * not exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       */
-      public Bound<T> withoutValidation() {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+      WriteFiles<T> write = null;
+      if (filenamePolicy != null) {
+        write = WriteFiles.to(
+            new AvroSink<>(
+                filenamePolicy,
+                AvroCoder.of(type, schema),
+                codec,
+                metadata));
+      } else {
+        write = WriteFiles.to(
+            new AvroSink<>(
+                filenamePrefix,
+                filenameSuffix,
+                shardTemplate,
+                AvroCoder.of(type, schema),
+                codec,
+                metadata));
       }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) compressed using specified codec.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withCodec(CodecFactory codec) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            new SerializableAvroCodecFactory(codec),
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+      if (getNumShards() > 0) {
+        write = write.withNumShards(getNumShards());
       }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) with the specified metadata.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withMetadata(Map<String, Object> metadata) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+      if (windowedWrites) {
+        write = write.withWindowedWrites();
       }
+      return input.apply("Write", write);
+    }
 
-      @Override
-      public PDone expand(PCollection<T> input) {
-        if (filenamePolicy == null && filenamePrefix == null) {
-          throw new IllegalStateException(
-              "need to set the filename prefix of an AvroIO.Write transform");
-        }
-        if (filenamePolicy != null && filenamePrefix != null) {
-          throw new IllegalStateException(
-              "cannot set both a filename policy and a filename prefix");
-        }
-        if (schema == null) {
-          throw new IllegalStateException("need to set the schema of an 
AvroIO.Write transform");
-        }
-
-        WriteFiles<T> write = null;
-        if (filenamePolicy != null) {
-          write = WriteFiles.to(
-              new AvroSink<>(
-                  filenamePolicy,
-                  AvroCoder.of(type, schema),
-                  codec,
-                  metadata));
-        } else {
-          write = WriteFiles.to(
-              new AvroSink<>(
-                  filenamePrefix,
-                  filenameSuffix,
-                  shardTemplate,
-                  AvroCoder.of(type, schema),
-                  codec,
-                  metadata));
-        }
-        if (getNumShards() > 0) {
-          write = write.withNumShards(getNumShards());
-        }
-        if (windowedWrites) {
-          write = write.withWindowedWrites();
-        }
-        return input.apply("Write", write);
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(DisplayData.item("schema", type)
+            .withLabel("Record Schema"))
+          .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
+            .withLabel("Output File Prefix"))
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+              .withLabel("Output Shard Name Template"),
+              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+              .withLabel("Output File Suffix"),
+              "")
+          .addIfNotDefault(DisplayData.item("numShards", numShards)
+              .withLabel("Maximum Output Shards"),
+              0)
+          .addIfNotDefault(DisplayData.item("codec", codec.toString())
+              .withLabel("Avro Compression Codec"),
+              DEFAULT_CODEC.toString());
+      builder.include("Metadata", new Metadata());
+    }
 
+    private class Metadata implements HasDisplayData {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder
-            .add(DisplayData.item("schema", type)
-              .withLabel("Record Schema"))
-            .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
-              .withLabel("Output File Prefix"))
-            .addIfNotDefault(DisplayData.item("shardNameTemplate", 
shardTemplate)
-                .withLabel("Output Shard Name Template"),
-                DEFAULT_SHARD_TEMPLATE)
-            .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
-                .withLabel("Output File Suffix"),
-                "")
-            .addIfNotDefault(DisplayData.item("numShards", numShards)
-                .withLabel("Maximum Output Shards"),
-                0)
-            .addIfNotDefault(DisplayData.item("codec", codec.toString())
-                .withLabel("Avro Compression Codec"),
-                DEFAULT_CODEC.toString());
-        builder.include("Metadata", new Metadata());
-      }
-
-      private class Metadata implements HasDisplayData {
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          for (Map.Entry<String, Object> entry : metadata.entrySet()) {
-            DisplayData.Type type = DisplayData.inferType(entry.getValue());
-            if (type != null) {
-              builder.add(DisplayData.item(entry.getKey(), type, 
entry.getValue()));
-            } else {
-              String base64 = BaseEncoding.base64().encode((byte[]) 
entry.getValue());
-              String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
-                  ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + 
"...";
-              builder.add(DisplayData.item(entry.getKey(), repr));
-            }
+        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+          DisplayData.Type type = DisplayData.inferType(entry.getValue());
+          if (type != null) {
+            builder.add(DisplayData.item(entry.getKey(), type, 
entry.getValue()));
+          } else {
+            String base64 = BaseEncoding.base64().encode((byte[]) 
entry.getValue());
+            String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
+                ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + 
"...";
+            builder.add(DisplayData.item(entry.getKey(), repr));
           }
         }
       }
+    }
 
-      /**
-       * Returns the current shard name template string.
-       */
-      public String getShardNameTemplate() {
-        return shardTemplate;
-      }
-
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
+    /**
+     * Returns the current shard name template string.
+     */
+    public String getShardNameTemplate() {
+      return shardTemplate;
+    }
 
-      public String getFilenamePrefix() {
-        return filenamePrefix;
-      }
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
 
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
+    public String getFilenamePrefix() {
+      return filenamePrefix;
+    }
 
-      public int getNumShards() {
-        return numShards;
-      }
+    public String getShardTemplate() {
+      return shardTemplate;
+    }
 
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
+    public int getNumShards() {
+      return numShards;
+    }
 
-      public Class<T> getType() {
-        return type;
-      }
+    public String getFilenameSuffix() {
+      return filenameSuffix;
+    }
 
-      public Schema getSchema() {
-        return schema;
-      }
+    public Class<T> getType() {
+      return type;
+    }
 
-      public CodecFactory getCodec() {
-        return codec.getCodec();
-      }
+    public Schema getSchema() {
+      return schema;
+    }
 
-      public Map<String, Object> getMetadata() {
-        return metadata;
-      }
+    public CodecFactory getCodec() {
+      return codec.getCodec();
     }
 
-    /** Disallow construction of utility class. */
-    private Write() {}
+    public Map<String, Object> getMetadata() {
+      return metadata;
+    }
   }
 
   // Pattern which matches old-style shard output patterns, which are now

http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 2144b0d..7df1b18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -51,7 +51,6 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.AvroIO.Write.Bound;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -104,7 +103,7 @@ public class AvroIOTest {
   @Test
   public void testAvroIOGetName() {
     assertEquals("AvroIO.Read", 
AvroIO.read().from("gs://bucket/foo*/baz").getName());
-    assertEquals("AvroIO.Write", 
AvroIO.Write.to("gs://bucket/foo/baz").getName());
+    assertEquals("AvroIO.Write", 
AvroIO.write().to("gs://bucket/foo/baz").getName());
   }
 
   @DefaultCoder(AvroCoder.class)
@@ -145,7 +144,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+      .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
           .withoutSharding()
           .withSchema(GenericClass.class));
     p.run();
@@ -169,7 +168,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
             .withoutSharding()
             .withCodec(CodecFactory.deflateCodec(9))
             .withSchema(GenericClass.class));
@@ -196,7 +195,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
             .withoutSharding()
             .withSchema(GenericClass.class)
             .withCodec(CodecFactory.nullCodec()));
@@ -264,7 +263,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+      .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
           .withoutSharding()
           .withSchema(GenericClass.class));
     p.run();
@@ -372,7 +371,7 @@ public class AvroIOTest {
     windowedAvroWritePipeline
         .apply(values)
         
.apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(AvroIO.Write.to(new WindowedFilenamePolicy(outputFilePrefix))
+        .apply(AvroIO.<GenericClass>write().to(new 
WindowedFilenamePolicy(outputFilePrefix))
             .withWindowedWrites()
             .withNumShards(2)
             .withSchema(GenericClass.class));
@@ -407,14 +406,14 @@ public class AvroIOTest {
 
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
-    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
         .to("gs://bucket/foo/baz");
     assertEquals(CodecFactory.deflateCodec(6).toString(), 
write.getCodec().toString());
   }
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
-    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.snappyCodec());
     assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -423,11 +422,11 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
-    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.deflateCodec(9));
 
-    AvroIO.Write.Bound<GenericRecord> serdeWrite = 
SerializableUtils.clone(write);
+    AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
 
     assertEquals(CodecFactory.deflateCodec(9).toString(), 
serdeWrite.getCodec().toString());
   }
@@ -435,11 +434,11 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomXZCodec() throws Exception {
-    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.xzCodec(9));
 
-    AvroIO.Write.Bound<GenericRecord> serdeWrite = 
SerializableUtils.clone(write);
+    AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
 
     assertEquals(CodecFactory.xzCodec(9).toString(), 
serdeWrite.getCodec().toString());
   }
@@ -453,7 +452,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
             .withoutSharding()
             .withSchema(GenericClass.class)
             .withMetadata(ImmutableMap.<String, Object>of(
@@ -475,7 +474,8 @@ public class AvroIOTest {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
     String outputFilePrefix = baseOutputFile.getAbsolutePath();
 
-    Bound<String> write = 
AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
+    AvroIO.Write<String> write =
+        AvroIO.<String>write().to(outputFilePrefix).withSchema(String.class);
     if (numShards > 1) {
       System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
@@ -556,7 +556,7 @@ public class AvroIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    AvroIO.Write.Bound<?> write = AvroIO.Write
+    AvroIO.Write<?> write = AvroIO.<GenericClass>write()
         .to("foo")
         .withShardNameTemplate("-SS-of-NN-")
         .withSuffix("bar")
@@ -584,10 +584,9 @@ public class AvroIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    AvroIO.Write.Bound<?> write = AvroIO.Write
+    AvroIO.Write<?> write = AvroIO.<GenericRecord>write()
         .to(outputPath)
-        .withSchema(Schema.create(Schema.Type.STRING))
-        .withoutValidation();
+        .withSchema(Schema.create(Schema.Type.STRING));
 
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("AvroIO.Write should include the file pattern in its primitive 
transform",

http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index b974663..ba7f1b9 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -271,16 +271,16 @@ public class AvroIOTransformTest {
           ImmutableList.<Object[]>builder()
               .add(
                   new Object[] {
-                      AvroIO.Write.withSchema(AvroGeneratedUser.class),
+                      
AvroIO.<AvroGeneratedUser>write().withSchema(AvroGeneratedUser.class),
                       generatedClass
                   },
                   new Object[] {
-                      AvroIO.Write.withSchema(SCHEMA),
+                      AvroIO.write().withSchema(SCHEMA),
                       fromSchema
                   },
 
                   new Object[] {
-                      AvroIO.Write.withSchema(SCHEMA_STRING),
+                      AvroIO.write().withSchema(SCHEMA_STRING),
                       fromSchemaString
                   })
               .build();
@@ -288,17 +288,17 @@ public class AvroIOTransformTest {
 
     @SuppressWarnings("DefaultAnnotationParam")
     @Parameterized.Parameter(0)
-    public AvroIO.Write.Bound writeTransform;
+    public AvroIO.Write writeTransform;
 
     @Parameterized.Parameter(1)
     public String testAlias;
 
-    private <T> void runTestWrite(final AvroIO.Write.Bound<T> writeBuilder)
+    private <T> void runTestWrite(final AvroIO.Write<T> writeBuilder)
         throws Exception {
 
       final File avroFile = tmpFolder.newFile("file.avro");
       final AvroGeneratedUser[] users = generateAvroObjects();
-      final AvroIO.Write.Bound<T> write = writeBuilder.to(avroFile.getPath());
+      final AvroIO.Write<T> write = writeBuilder.to(avroFile.getPath());
 
       @SuppressWarnings("unchecked") final
       PCollection<T> input =

Reply via email to