[ 
https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=178275&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-178275
 ]

ASF GitHub Bot logged work on BEAM-4454:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Dec/18 16:55
            Start Date: 22/Dec/18 16:55
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #7290: [BEAM-4454] 
Support avro schema inference in sources
URL: https://github.com/apache/beam/pull/7290
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index a22345c060fe..8c408bf8b463 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -278,6 +278,7 @@
         
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
         .setHintMatchesManyFiles(false)
         .build();
   }
@@ -288,6 +289,7 @@
         
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
         // 64MB is a reasonable value that allows to amortize the cost of 
opening files,
         // but is not so large as to exhaust a typical runner's maximum amount 
of output per
         // ProcessElement call.
@@ -301,6 +303,7 @@
         
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
+        .setInferBeamSchema(false)
         .setHintMatchesManyFiles(false)
         .build();
   }
@@ -314,6 +317,7 @@
         
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
+        .setInferBeamSchema(false)
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
         .build();
   }
@@ -430,6 +434,19 @@
         .setWindowedWrites(false);
   }
 
+  private static <T> PCollection<T> setBeamSchema(
+      PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) {
+    org.apache.beam.sdk.schemas.Schema beamSchema =
+        org.apache.beam.sdk.schemas.utils.AvroUtils.getSchema(clazz, schema);
+    if (beamSchema != null) {
+      pc.setSchema(
+          beamSchema,
+          org.apache.beam.sdk.schemas.utils.AvroUtils.getToRowFunction(clazz, 
schema),
+          
org.apache.beam.sdk.schemas.utils.AvroUtils.getFromRowFunction(clazz));
+    }
+    return pc;
+  }
+
   /** Implementation of {@link #read} and {@link #readGenericRecords}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
@@ -444,6 +461,8 @@
     @Nullable
     abstract Schema getSchema();
 
+    abstract boolean getInferBeamSchema();
+
     abstract boolean getHintMatchesManyFiles();
 
     abstract Builder<T> toBuilder();
@@ -458,6 +477,8 @@
 
       abstract Builder<T> setSchema(Schema schema);
 
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
       abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
 
       abstract Read<T> build();
@@ -488,6 +509,11 @@
       return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
     }
 
+    @Experimental(Kind.SCHEMAS)
+    public Read<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
     /**
      * Continuously watches for new files matching the filepattern, polling it 
at the given
      * interval, until the given termination condition is reached. The 
returned {@link PCollection}
@@ -516,19 +542,22 @@
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
 
       if (getMatchConfiguration().getWatchInterval() == null && 
!getHintMatchesManyFiles()) {
-        return input.apply(
-            "Read",
-            org.apache.beam.sdk.io.Read.from(
-                createSource(
-                    getFilepattern(),
-                    getMatchConfiguration().getEmptyMatchTreatment(),
-                    getRecordClass(),
-                    getSchema())));
+        PCollection<T> read =
+            input.apply(
+                "Read",
+                org.apache.beam.sdk.io.Read.from(
+                    createSource(
+                        getFilepattern(),
+                        getMatchConfiguration().getEmptyMatchTreatment(),
+                        getRecordClass(),
+                        getSchema())));
+        return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), 
getSchema()) : read;
       }
       // All other cases go through ReadAll.
 
@@ -580,6 +609,8 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
     abstract long getDesiredBundleSizeBytes();
 
+    abstract boolean getInferBeamSchema();
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -592,6 +623,8 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
       abstract Builder<T> setDesiredBundleSizeBytes(long 
desiredBundleSizeBytes);
 
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
       abstract ReadAll<T> build();
     }
 
@@ -618,18 +651,29 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
     }
 
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. 
This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public ReadAll<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
     @Override
     public PCollection<T> expand(PCollection<String> input) {
       checkNotNull(getSchema(), "schema");
-      return input
-          .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          
.apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply(
-              "Read all via FileBasedSource",
-              new ReadAllViaFileBasedSource<>(
-                  getDesiredBundleSizeBytes(),
-                  new CreateSourceFn<>(getRecordClass(), 
getSchema().toString()),
-                  AvroCoder.of(getRecordClass(), getSchema())));
+      PCollection<T> read =
+          input
+              
.apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+              
.apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+              .apply(
+                  "Read all via FileBasedSource",
+                  new ReadAllViaFileBasedSource<>(
+                      getDesiredBundleSizeBytes(),
+                      new CreateSourceFn<>(getRecordClass(), 
getSchema().toString()),
+                      AvroCoder.of(getRecordClass(), getSchema())));
+      return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), 
getSchema()) : read;
     }
 
     @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
index 29bf51a06a77..0025864fc56c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import static org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamSchema;
+
+import org.apache.avro.reflect.ReflectData;
 import org.apache.beam.sdk.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -30,7 +33,7 @@
 public class AvroRecordSchema extends GetterBasedSchemaProvider {
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
-    return AvroUtils.getSchema(typeDescriptor.getRawType());
+    return 
toBeamSchema(ReflectData.get().getSchema(typeDescriptor.getRawType()));
   }
 
   @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index c2e737f69418..43e6a5c9a70f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -50,6 +50,7 @@
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.AvroRecordSchema;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
@@ -251,6 +252,35 @@ public static GenericRecord toGenericRecord(
     return builder.build();
   }
 
+  @SuppressWarnings("unchecked")
+  public static <T> SerializableFunction<T, Row> getToRowFunction(
+      Class<T> clazz, @Nullable org.apache.avro.Schema schema) {
+    if (GenericRecord.class.equals(clazz)) {
+      Schema beamSchema = toBeamSchema(schema);
+      return (SerializableFunction<T, Row>) 
getGenericRecordToRowFunction(beamSchema);
+    } else {
+      return new AvroRecordSchema().toRowFunction(TypeDescriptor.of(clazz));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> SerializableFunction<Row, T> getFromRowFunction(Class<T> 
clazz) {
+    return (GenericRecord.class.equals(clazz))
+        ? (SerializableFunction<Row, T>) getRowToGenericRecordFunction(null)
+        : new AvroRecordSchema().fromRowFunction(TypeDescriptor.of(clazz));
+  }
+
+  @Nullable
+  public static <T> Schema getSchema(Class<T> clazz, @Nullable 
org.apache.avro.Schema schema) {
+    if (schema != null) {
+      return schema.getType().equals(Type.RECORD) ? toBeamSchema(schema) : 
null;
+    }
+    if (GenericRecord.class.equals(clazz)) {
+      throw new IllegalArgumentException("No schema provided for 
getSchema(GenericRecord)");
+    }
+    return new AvroRecordSchema().schemaFor(TypeDescriptor.of(clazz));
+  }
+
   /**
    * Returns a function mapping AVRO {@link GenericRecord}s to Beam {@link 
Row}s for use in {@link
    * org.apache.beam.sdk.values.PCollection#setSchema}.
@@ -269,11 +299,6 @@ public static GenericRecord toGenericRecord(
     return g -> toGenericRecord(g, avroSchema);
   }
 
-  /** Infer a {@link Schema} from either an AVRO-generated SpecificRecord or a 
POJO. */
-  public static <T> Schema getSchema(Class<T> clazz) {
-    return toBeamSchema(ReflectData.get().getSchema(clazz));
-  }
-
   private static final class AvroSpecificRecordFieldValueTypeSupplier
       implements FieldValueTypeSupplier {
     @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index 1be78837ea67..a0d40c8c5a88 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -65,11 +65,9 @@
   /**
    * A naming strategy for ByteBuddy classes.
    *
-   * <p>We always inject the generatter classes in the same same package as 
the user's target class.
-   *
-   * @kanterov kanterov 20 hours ago Contributor nit: s/generatter/generated/
-   * @reuvenlax Reply… This way, if the class fields or methods are package 
private, our generated
-   *     class can still access them.
+   * <p>We always inject the generator classes in the same same package as the 
user's target class.
+   * This way, if the class fields or methods are package private, our 
generated class can still
+   * access them.
    */
   static class InjectPackageStrategy extends NamingStrategy.AbstractBase {
     /** A resolver for the base name for naming the unnamed type. */
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 64d736fa19eb..5dff2905dc2f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -48,6 +48,7 @@
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -105,1138 +106,1187 @@
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
 /** Tests for AvroIO Read and Write transforms. */
-@RunWith(JUnit4.class)
 public class AvroIOTest implements Serializable {
-
-  @Rule public transient TestPipeline writePipeline = TestPipeline.create();
-
-  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
-
-  @Rule public transient TestPipeline windowedAvroWritePipeline = 
TestPipeline.create();
-
-  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Rule public transient ExpectedException expectedException = 
ExpectedException.none();
-
-  @Test
-  public void testAvroIOGetName() {
-    assertEquals("AvroIO.Read", 
AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
-    assertEquals("AvroIO.Write", 
AvroIO.write(String.class).to("/tmp/foo/baz").getName());
-  }
-
-  @DefaultCoder(AvroCoder.class)
-  static class GenericClass {
-    int intField;
-    String stringField;
-
-    public GenericClass() {}
-
-    public GenericClass(int intField, String stringField) {
-      this.intField = intField;
-      this.stringField = stringField;
+  /** Unit tests. */
+  @RunWith(JUnit4.class)
+  public static class SimpleTests implements Serializable {
+    @Test
+    public void testAvroIOGetName() {
+      assertEquals("AvroIO.Read", 
AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
+      assertEquals("AvroIO.Write", 
AvroIO.write(String.class).to("/tmp/foo/baz").getName());
     }
 
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("intField", intField)
-          .add("stringField", stringField)
-          .toString();
+    @Test
+    public void testWriteWithDefaultCodec() throws Exception {
+      AvroIO.Write<String> write = 
AvroIO.write(String.class).to("/tmp/foo/baz");
+      assertEquals(CodecFactory.snappyCodec().toString(), 
write.inner.getCodec().toString());
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(intField, stringField);
+    @Test
+    public void testWriteWithCustomCodec() throws Exception {
+      AvroIO.Write<String> write =
+          
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
+      assertEquals(SNAPPY_CODEC, write.inner.getCodec().toString());
     }
 
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof GenericClass)) {
-        return false;
-      }
-      GenericClass o = (GenericClass) other;
-      return Objects.equals(intField, o.intField) && 
Objects.equals(stringField, o.stringField);
-    }
-  }
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
+      AvroIO.Write<String> write =
+          
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.deflateCodec(9));
 
-  private static class ParseGenericClass
-      implements SerializableFunction<GenericRecord, GenericClass> {
-    @Override
-    public GenericClass apply(GenericRecord input) {
-      return new GenericClass((int) input.get("intField"), 
input.get("stringField").toString());
+      assertEquals(
+          CodecFactory.deflateCodec(9).toString(),
+          
SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
     }
-  }
 
-  private enum Sharding {
-    RUNNER_DETERMINED,
-    WITHOUT_SHARDING,
-    FIXED_3_SHARDS
-  }
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWriteWithSerDeCustomXZCodec() throws Exception {
+      AvroIO.Write<String> write =
+          
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.xzCodec(9));
 
-  private enum WriteMethod {
-    AVROIO_WRITE,
-    AVROIO_SINK
-  }
+      assertEquals(
+          CodecFactory.xzCodec(9).toString(),
+          
SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
+    }
 
-  private static final String SCHEMA_STRING =
-      "{\"namespace\": \"example.avro\",\n"
-          + " \"type\": \"record\",\n"
-          + " \"name\": \"AvroGeneratedUser\",\n"
-          + " \"fields\": [\n"
-          + "     {\"name\": \"name\", \"type\": \"string\"},\n"
-          + "     {\"name\": \"favorite_number\", \"type\": [\"int\", 
\"null\"]},\n"
-          + "     {\"name\": \"favorite_color\", \"type\": [\"string\", 
\"null\"]}\n"
-          + " ]\n"
-          + "}";
+    @Test
+    public void testReadDisplayData() {
+      AvroIO.Read<String> read = AvroIO.read(String.class).from("/foo.*");
 
-  private static final Schema SCHEMA = new 
Schema.Parser().parse(SCHEMA_STRING);
+      DisplayData displayData = DisplayData.from(read);
+      assertThat(displayData, hasDisplayItem("filePattern", "/foo.*"));
+    }
+  }
 
-  @Test
+  /** NeedsRunner tests. */
+  @RunWith(Parameterized.class)
   @Category(NeedsRunner.class)
-  public void testWriteThenReadJavaClass() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            AvroIO.write(GenericClass.class)
-                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
-                .withoutSharding());
-    writePipeline.run();
+  public static class NeedsRunnerTests implements Serializable {
+    @Rule public transient TestPipeline writePipeline = TestPipeline.create();
 
-    PAssert.that(
-            readPipeline.apply(
-                "Read",
-                AvroIO.read(GenericClass.class)
-                    
.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
-        .containsInAnyOrder(values);
+    @Rule public transient TestPipeline readPipeline = TestPipeline.create();
 
-    readPipeline.run();
-  }
+    @Rule public transient TestPipeline windowedAvroWritePipeline = 
TestPipeline.create();
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadCustomType() throws Throwable {
-    List<Long> values = Arrays.asList(0L, 1L, 2L);
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            AvroIO.<Long, GenericClass>writeCustomType()
-                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
-                .withFormatFunction(new CreateGenericClass())
-                .withSchema(ReflectData.get().getSchema(GenericClass.class))
-                .withoutSharding());
-    writePipeline.run();
-
-    PAssert.that(
-            readPipeline
-                .apply(
-                    "Read",
-                    AvroIO.read(GenericClass.class)
-                        
.from(readPipeline.newProvider(outputFile.getAbsolutePath())))
-                .apply(
-                    MapElements.via(
-                        new SimpleFunction<GenericClass, Long>() {
-                          @Override
-                          public Long apply(GenericClass input) {
-                            return (long) input.intField;
-                          }
-                        })))
-        .containsInAnyOrder(values);
-
-    readPipeline.run();
-  }
+    @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
 
-  private <T extends GenericRecord> void testWriteThenReadGeneratedClass(
-      AvroIO.Write<T> writeTransform, AvroIO.Read<T> readTransform) throws 
Exception {
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    List<T> values =
-        ImmutableList.of(
-            (T) new AvroGeneratedUser("Bob", 256, null),
-            (T) new AvroGeneratedUser("Alice", 128, null),
-            (T) new AvroGeneratedUser("Ted", null, "white"));
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            writeTransform
-                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
-                .withoutSharding());
-    writePipeline.run();
-
-    PAssert.that(
-            readPipeline.apply(
-                "Read", 
readTransform.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
-        .containsInAnyOrder(values);
+    @Rule public transient ExpectedException expectedException = 
ExpectedException.none();
 
-    readPipeline.run();
-  }
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<Object[]> params() {
+      return Arrays.asList(new Object[][] {{true}, {false}});
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadGeneratedClassWithClass() throws Throwable {
-    testWriteThenReadGeneratedClass(
-        AvroIO.write(AvroGeneratedUser.class), 
AvroIO.read(AvroGeneratedUser.class));
-  }
+    @Parameterized.Parameter public boolean withBeamSchemas;
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadGeneratedClassWithSchema() throws Throwable {
-    testWriteThenReadGeneratedClass(
-        AvroIO.writeGenericRecords(SCHEMA), AvroIO.readGenericRecords(SCHEMA));
-  }
+    @DefaultCoder(AvroCoder.class)
+    static class GenericClass {
+      int intField;
+      String stringField;
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadGeneratedClassWithSchemaString() throws 
Throwable {
-    testWriteThenReadGeneratedClass(
-        AvroIO.writeGenericRecords(SCHEMA.toString()),
-        AvroIO.readGenericRecords(SCHEMA.toString()));
-  }
+      public GenericClass() {}
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteSingleFileThenReadUsingAllMethods() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
-    writePipeline.run();
-
-    // Test the same data using all versions of read().
-    PCollection<String> path =
-        readPipeline.apply("Create path", 
Create.of(outputFile.getAbsolutePath()));
-    PAssert.that(
-            readPipeline.apply(
-                "Read", 
AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            readPipeline.apply(
-                "Read withHintMatchesManyFiles",
-                AvroIO.read(GenericClass.class)
-                    .from(outputFile.getAbsolutePath())
-                    .withHintMatchesManyFiles()))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            path.apply(
-                "ReadAll", 
AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            readPipeline.apply(
-                "Parse",
-                AvroIO.parseGenericRecords(new ParseGenericClass())
-                    .from(outputFile.getAbsolutePath())
-                    .withCoder(AvroCoder.of(GenericClass.class))))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            readPipeline.apply(
-                "Parse withHintMatchesManyFiles",
-                AvroIO.parseGenericRecords(new ParseGenericClass())
-                    .from(outputFile.getAbsolutePath())
-                    .withCoder(AvroCoder.of(GenericClass.class))
-                    .withHintMatchesManyFiles()))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            path.apply(
-                "ParseAll",
-                AvroIO.parseAllGenericRecords(new ParseGenericClass())
-                    .withCoder(AvroCoder.of(GenericClass.class))
-                    .withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(values);
-
-    readPipeline.run();
-  }
+      public GenericClass(int intField, String stringField) {
+        this.intField = intField;
+        this.stringField = stringField;
+      }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadMultipleFilepatterns() throws Throwable {
-    List<GenericClass> firstValues = Lists.newArrayList();
-    List<GenericClass> secondValues = Lists.newArrayList();
-    for (int i = 0; i < 10; ++i) {
-      firstValues.add(new GenericClass(i, "a" + i));
-      secondValues.add(new GenericClass(i, "b" + i));
-    }
-    writePipeline
-        .apply("Create first", Create.of(firstValues))
-        .apply(
-            "Write first",
-            AvroIO.write(GenericClass.class)
-                .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
-                .withNumShards(2));
-    writePipeline
-        .apply("Create second", Create.of(secondValues))
-        .apply(
-            "Write second",
-            AvroIO.write(GenericClass.class)
-                .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
-                .withNumShards(3));
-    writePipeline.run();
-
-    // Test readAll() and parseAllGenericRecords().
-    PCollection<String> paths =
-        readPipeline.apply(
-            "Create paths",
-            Create.of(
-                tmpFolder.getRoot().getAbsolutePath() + "/first*",
-                tmpFolder.getRoot().getAbsolutePath() + "/second*"));
-    PAssert.that(
-            paths.apply(
-                "Read all", 
AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-    PAssert.that(
-            paths.apply(
-                "Parse all",
-                AvroIO.parseAllGenericRecords(new ParseGenericClass())
-                    .withCoder(AvroCoder.of(GenericClass.class))
-                    .withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-
-    readPipeline.run();
-  }
+      @Override
+      public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+            .add("intField", intField)
+            .add("stringField", stringField)
+            .toString();
+      }
 
-  private static class CreateGenericClass extends SimpleFunction<Long, 
GenericClass> {
-    @Override
-    public GenericClass apply(Long i) {
-      return new GenericClass(i.intValue(), "value" + i);
-    }
-  }
+      @Override
+      public int hashCode() {
+        return Objects.hash(intField, stringField);
+      }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testContinuouslyWriteAndReadMultipleFilepatterns() throws 
Throwable {
-    SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass();
-    List<GenericClass> firstValues = Lists.newArrayList();
-    List<GenericClass> secondValues = Lists.newArrayList();
-    for (int i = 0; i < 7; ++i) {
-      (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i));
+      @Override
+      public boolean equals(Object other) {
+        if (other == null || !(other instanceof GenericClass)) {
+          return false;
+        }
+        GenericClass o = (GenericClass) other;
+        return Objects.equals(intField, o.intField) && 
Objects.equals(stringField, o.stringField);
+      }
     }
-    // Configure windowing of the input so that it fires every time a new 
element is generated,
-    // so that files are written continuously.
-    Window<Long> window =
-        Window.<Long>into(FixedWindows.of(Duration.millis(100)))
-            .withAllowedLateness(Duration.ZERO)
-            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
-            .discardingFiredPanes();
-    readPipeline
-        .apply("Sequence first", GenerateSequence.from(0).to(3).withRate(1, 
Duration.millis(300)))
-        .apply("Window first", window)
-        .apply("Map first", MapElements.via(mapFn))
-        .apply(
-            "Write first",
-            AvroIO.write(GenericClass.class)
-                .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
-                .withNumShards(2)
-                .withWindowedWrites());
-    readPipeline
-        .apply("Sequence second", GenerateSequence.from(3).to(7).withRate(1, 
Duration.millis(300)))
-        .apply("Window second", window)
-        .apply("Map second", MapElements.via(mapFn))
-        .apply(
-            "Write second",
-            AvroIO.write(GenericClass.class)
-                .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
-                .withNumShards(3)
-                .withWindowedWrites());
 
-    // Test read(), readAll(), parse(), and parseAllGenericRecords() with 
watchForNewFiles().
-    PAssert.that(
-            readPipeline.apply(
-                "Read",
-                AvroIO.read(GenericClass.class)
-                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
-                    .watchForNewFiles(
-                        Duration.millis(100),
-                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
-        .containsInAnyOrder(firstValues);
-    PAssert.that(
-            readPipeline.apply(
-                "Parse",
-                AvroIO.parseGenericRecords(new ParseGenericClass())
-                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
-                    .watchForNewFiles(
-                        Duration.millis(100),
-                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
-        .containsInAnyOrder(firstValues);
-
-    PCollection<String> paths =
-        readPipeline.apply(
-            "Create paths",
-            Create.of(
-                tmpFolder.getRoot().getAbsolutePath() + "/first*",
-                tmpFolder.getRoot().getAbsolutePath() + "/second*"));
-    PAssert.that(
-            paths.apply(
-                "Read all",
-                AvroIO.readAll(GenericClass.class)
-                    .watchForNewFiles(
-                        Duration.millis(100),
-                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))
-                    .withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-    PAssert.that(
-            paths.apply(
-                "Parse all",
-                AvroIO.parseAllGenericRecords(new ParseGenericClass())
-                    .withCoder(AvroCoder.of(GenericClass.class))
-                    .watchForNewFiles(
-                        Duration.millis(100),
-                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))
-                    .withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-    readPipeline.run();
-  }
+    private static class ParseGenericClass
+        implements SerializableFunction<GenericRecord, GenericClass> {
+      @Override
+      public GenericClass apply(GenericRecord input) {
+        return new GenericClass((int) input.get("intField"), 
input.get("stringField").toString());
+      }
 
-  @Test
-  @SuppressWarnings("unchecked")
-  @Category(NeedsRunner.class)
-  public void testCompressedWriteAndReadASingleFile() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
+      @Test
+      public void testWriteDisplayData() {
+        AvroIO.Write<GenericClass> write =
             AvroIO.write(GenericClass.class)
-                .to(outputFile.getAbsolutePath())
-                .withoutSharding()
-                .withCodec(CodecFactory.deflateCodec(9)));
-    writePipeline.run();
-
-    PAssert.that(
-            
readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
-        .containsInAnyOrder(values);
-    readPipeline.run();
-
-    try (DataFileStream dataFileStream =
-        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
-      assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
+                .to("/foo")
+                .withShardNameTemplate("-SS-of-NN-")
+                .withSuffix("bar")
+                .withNumShards(100)
+                .withCodec(CodecFactory.deflateCodec(6));
+
+        DisplayData displayData = DisplayData.from(write);
+
+        assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
+        assertThat(displayData, hasDisplayItem("shardNameTemplate", 
"-SS-of-NN-"));
+        assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+        assertThat(
+            displayData,
+            hasDisplayItem(
+                "schema",
+                
"{\"type\":\"record\",\"name\":\"GenericClass\",\"namespace\":\"org.apache.beam.sdk.io"
+                    + 
".AvroIOTest$\",\"fields\":[{\"name\":\"intField\",\"type\":\"int\"},"
+                    + "{\"name\":\"stringField\",\"type\":\"string\"}]}"));
+        assertThat(displayData, hasDisplayItem("numShards", 100));
+        assertThat(displayData, hasDisplayItem("codec", 
CodecFactory.deflateCodec(6).toString()));
+      }
     }
-  }
 
-  @Test
-  @SuppressWarnings("unchecked")
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadASingleFileWithNullCodec() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            AvroIO.write(GenericClass.class)
-                .to(outputFile.getAbsolutePath())
-                .withoutSharding()
-                .withCodec(CodecFactory.nullCodec()));
-    writePipeline.run();
-
-    PAssert.that(
-            
readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
-        .containsInAnyOrder(values);
-    readPipeline.run();
-
-    try (DataFileStream dataFileStream =
-        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
-      assertEquals("null", dataFileStream.getMetaString("avro.codec"));
+    private enum Sharding {
+      RUNNER_DETERMINED,
+      WITHOUT_SHARDING,
+      FIXED_3_SHARDS
     }
-  }
-
-  @DefaultCoder(AvroCoder.class)
-  static class GenericClassV2 {
-    int intField;
-    String stringField;
-    @Nullable String nullableField;
 
-    public GenericClassV2() {}
-
-    public GenericClassV2(int intValue, String stringValue, String 
nullableValue) {
-      this.intField = intValue;
-      this.stringField = stringValue;
-      this.nullableField = nullableValue;
+    private enum WriteMethod {
+      AVROIO_WRITE,
+      AVROIO_SINK
     }
 
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("intField", intField)
-          .add("stringField", stringField)
-          .add("nullableField", nullableField)
-          .toString();
+    private static final String SCHEMA_STRING =
+        "{\"namespace\": \"example.avro\",\n"
+            + " \"type\": \"record\",\n"
+            + " \"name\": \"AvroGeneratedUser\",\n"
+            + " \"fields\": [\n"
+            + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+            + "     {\"name\": \"favorite_number\", \"type\": [\"int\", 
\"null\"]},\n"
+            + "     {\"name\": \"favorite_color\", \"type\": [\"string\", 
\"null\"]}\n"
+            + " ]\n"
+            + "}";
+
+    private static final Schema SCHEMA = new 
Schema.Parser().parse(SCHEMA_STRING);
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadJavaClass() throws Throwable {
+      List<GenericClass> values =
+          ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              AvroIO.write(GenericClass.class)
+                  .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                  .withoutSharding());
+      writePipeline.run();
+
+      PAssert.that(
+              readPipeline.apply(
+                  "Read",
+                  AvroIO.read(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      
.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
+          .containsInAnyOrder(values);
+
+      readPipeline.run();
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(intField, stringField, nullableField);
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadCustomType() throws Throwable {
+      List<Long> values = Arrays.asList(0L, 1L, 2L);
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              AvroIO.<Long, GenericClass>writeCustomType()
+                  .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                  .withFormatFunction(new CreateGenericClass())
+                  .withSchema(ReflectData.get().getSchema(GenericClass.class))
+                  .withoutSharding());
+      writePipeline.run();
+
+      PAssert.that(
+              readPipeline
+                  .apply(
+                      "Read",
+                      AvroIO.read(GenericClass.class)
+                          .withBeamSchemas(withBeamSchemas)
+                          
.from(readPipeline.newProvider(outputFile.getAbsolutePath())))
+                  .apply(
+                      MapElements.via(
+                          new SimpleFunction<GenericClass, Long>() {
+                            @Override
+                            public Long apply(GenericClass input) {
+                              return (long) input.intField;
+                            }
+                          })))
+          .containsInAnyOrder(values);
+
+      readPipeline.run();
     }
 
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof GenericClassV2)) {
-        return false;
-      }
-      GenericClassV2 o = (GenericClassV2) other;
-      return Objects.equals(intField, o.intField)
-          && Objects.equals(stringField, o.stringField)
-          && Objects.equals(nullableField, o.nullableField);
+    private <T extends GenericRecord> void testWriteThenReadGeneratedClass(
+        AvroIO.Write<T> writeTransform, AvroIO.Read<T> readTransform) throws 
Exception {
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      List<T> values =
+          ImmutableList.of(
+              (T) new AvroGeneratedUser("Bob", 256, null),
+              (T) new AvroGeneratedUser("Alice", 128, null),
+              (T) new AvroGeneratedUser("Ted", null, "white"));
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              writeTransform
+                  .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                  .withoutSharding());
+      writePipeline.run();
+
+      PAssert.that(
+              readPipeline.apply(
+                  "Read",
+                  
readTransform.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
+          .containsInAnyOrder(values);
+
+      readPipeline.run();
     }
-  }
-
-  /**
-   * Tests that {@code AvroIO} can read an upgraded version of an old class, 
as long as the schema
-   * resolution process succeeds. This test covers the case when a new, {@code 
@Nullable} field has
-   * been added.
-   *
-   * <p>For more information, see 
http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadSchemaUpgrade() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
-    writePipeline.run();
 
-    List<GenericClassV2> expected =
-        ImmutableList.of(new GenericClassV2(3, "hi", null), new 
GenericClassV2(5, "bar", null));
-
-    PAssert.that(
-            readPipeline.apply(
-                
AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath())))
-        .containsInAnyOrder(expected);
-    readPipeline.run();
-  }
-
-  private static class WindowedFilenamePolicy extends FilenamePolicy {
-    final ResourceId outputFilePrefix;
-
-    WindowedFilenamePolicy(ResourceId outputFilePrefix) {
-      this.outputFilePrefix = outputFilePrefix;
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadGeneratedClassWithClass() throws Throwable {
+      testWriteThenReadGeneratedClass(
+          AvroIO.write(AvroGeneratedUser.class),
+          
AvroIO.read(AvroGeneratedUser.class).withBeamSchemas(withBeamSchemas));
     }
 
-    @Override
-    public ResourceId windowedFilename(
-        int shardNumber,
-        int numShards,
-        BoundedWindow window,
-        PaneInfo paneInfo,
-        OutputFileHints outputFileHints) {
-      String filenamePrefix =
-          outputFilePrefix.isDirectory() ? "" : 
firstNonNull(outputFilePrefix.getFilename(), "");
-
-      IntervalWindow interval = (IntervalWindow) window;
-      String windowStr =
-          String.format("%s-%s", interval.start().toString(), 
interval.end().toString());
-      String filename =
-          String.format(
-              "%s-%s-%s-of-%s-pane-%s%s%s.avro",
-              filenamePrefix,
-              windowStr,
-              shardNumber,
-              numShards,
-              paneInfo.getIndex(),
-              paneInfo.isLast() ? "-last" : "",
-              outputFileHints.getSuggestedFilenameSuffix());
-      return outputFilePrefix.getCurrentDirectory().resolve(filename, 
RESOLVE_FILE);
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadGeneratedClassWithSchema() throws Throwable {
+      testWriteThenReadGeneratedClass(
+          AvroIO.writeGenericRecords(SCHEMA),
+          AvroIO.readGenericRecords(SCHEMA).withBeamSchemas(withBeamSchemas));
     }
 
-    @Override
-    public ResourceId unwindowedFilename(
-        int shardNumber, int numShards, OutputFileHints outputFileHints) {
-      throw new UnsupportedOperationException("Expecting windowed outputs 
only");
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadGeneratedClassWithSchemaString() throws 
Throwable {
+      testWriteThenReadGeneratedClass(
+          AvroIO.writeGenericRecords(SCHEMA.toString()),
+          
AvroIO.readGenericRecords(SCHEMA.toString()).withBeamSchemas(withBeamSchemas));
     }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(
-          DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
-              .withLabel("File Name Prefix"));
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteSingleFileThenReadUsingAllMethods() throws Throwable {
+      List<GenericClass> values =
+          ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              
AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
+      writePipeline.run();
+
+      // Test the same data using all versions of read().
+      PCollection<String> path =
+          readPipeline.apply("Create path", 
Create.of(outputFile.getAbsolutePath()));
+      PAssert.that(
+              readPipeline.apply(
+                  "Read",
+                  AvroIO.read(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .from(outputFile.getAbsolutePath())))
+          .containsInAnyOrder(values);
+      PAssert.that(
+              readPipeline.apply(
+                  "Read withHintMatchesManyFiles",
+                  AvroIO.read(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .from(outputFile.getAbsolutePath())
+                      .withHintMatchesManyFiles()))
+          .containsInAnyOrder(values);
+      PAssert.that(
+              path.apply(
+                  "ReadAll",
+                  AvroIO.readAll(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .withDesiredBundleSizeBytes(10)))
+          .containsInAnyOrder(values);
+      PAssert.that(
+              readPipeline.apply(
+                  "Parse",
+                  AvroIO.parseGenericRecords(new ParseGenericClass())
+                      .from(outputFile.getAbsolutePath())
+                      .withCoder(AvroCoder.of(GenericClass.class))))
+          .containsInAnyOrder(values);
+      PAssert.that(
+              readPipeline.apply(
+                  "Parse withHintMatchesManyFiles",
+                  AvroIO.parseGenericRecords(new ParseGenericClass())
+                      .from(outputFile.getAbsolutePath())
+                      .withCoder(AvroCoder.of(GenericClass.class))
+                      .withHintMatchesManyFiles()))
+          .containsInAnyOrder(values);
+      PAssert.that(
+              path.apply(
+                  "ParseAll",
+                  AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                      .withCoder(AvroCoder.of(GenericClass.class))
+                      .withDesiredBundleSizeBytes(10)))
+          .containsInAnyOrder(values);
+
+      readPipeline.run();
     }
-  }
 
-  @Test
-  @Category({NeedsRunner.class, UsesTestStream.class})
-  public void testWriteWindowed() throws Throwable {
-    testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_WRITE);
-  }
-
-  @Test
-  @Category({NeedsRunner.class, UsesTestStream.class})
-  public void testWindowedAvroIOWriteViaSink() throws Throwable {
-    testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_SINK);
-  }
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadMultipleFilepatterns() throws Throwable {
+      List<GenericClass> firstValues = Lists.newArrayList();
+      List<GenericClass> secondValues = Lists.newArrayList();
+      for (int i = 0; i < 10; ++i) {
+        firstValues.add(new GenericClass(i, "a" + i));
+        secondValues.add(new GenericClass(i, "b" + i));
+      }
+      writePipeline
+          .apply("Create first", Create.of(firstValues))
+          .apply(
+              "Write first",
+              AvroIO.write(GenericClass.class)
+                  .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
+                  .withNumShards(2));
+      writePipeline
+          .apply("Create second", Create.of(secondValues))
+          .apply(
+              "Write second",
+              AvroIO.write(GenericClass.class)
+                  .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
+                  .withNumShards(3));
+      writePipeline.run();
 
-  void testWindowedAvroIOWriteUsingMethod(WriteMethod method) throws 
IOException {
-    Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), 
"testwrite");
-    final String baseFilename = baseDir.resolve("prefix").toString();
-
-    Instant base = new Instant(0);
-    ArrayList<GenericClass> allElements = new ArrayList<>();
-    ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new 
ArrayList<>();
-    ArrayList<Instant> firstWindowTimestamps =
-        Lists.newArrayList(
-            base.plus(Duration.ZERO), base.plus(Duration.standardSeconds(10)),
-            base.plus(Duration.standardSeconds(20)), 
base.plus(Duration.standardSeconds(30)));
-
-    Random random = new Random();
-    for (int i = 0; i < 100; ++i) {
-      GenericClass item = new GenericClass(i, String.valueOf(i));
-      allElements.add(item);
-      firstWindowElements.add(
-          TimestampedValue.of(
-              item, 
firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
+      // Test readAll() and parseAllGenericRecords().
+      PCollection<String> paths =
+          readPipeline.apply(
+              "Create paths",
+              Create.of(
+                  tmpFolder.getRoot().getAbsolutePath() + "/first*",
+                  tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+      PAssert.that(
+              paths.apply(
+                  "Read all",
+                  AvroIO.readAll(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .withDesiredBundleSizeBytes(10)))
+          .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+      PAssert.that(
+              paths.apply(
+                  "Parse all",
+                  AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                      .withCoder(AvroCoder.of(GenericClass.class))
+                      .withDesiredBundleSizeBytes(10)))
+          .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+
+      readPipeline.run();
     }
 
-    ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new 
ArrayList<>();
-    ArrayList<Instant> secondWindowTimestamps =
-        Lists.newArrayList(
-            base.plus(Duration.standardSeconds(60)), 
base.plus(Duration.standardSeconds(70)),
-            base.plus(Duration.standardSeconds(80)), 
base.plus(Duration.standardSeconds(90)));
-    for (int i = 100; i < 200; ++i) {
-      GenericClass item = new GenericClass(i, String.valueOf(i));
-      allElements.add(new GenericClass(i, String.valueOf(i)));
-      secondWindowElements.add(
-          TimestampedValue.of(
-              item, 
secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
+    private static class CreateGenericClass extends SimpleFunction<Long, 
GenericClass> {
+      @Override
+      public GenericClass apply(Long i) {
+        return new GenericClass(i.intValue(), "value" + i);
+      }
     }
 
-    TimestampedValue<GenericClass>[] firstWindowArray =
-        firstWindowElements.toArray(new TimestampedValue[100]);
-    TimestampedValue<GenericClass>[] secondWindowArray =
-        secondWindowElements.toArray(new TimestampedValue[100]);
-
-    TestStream<GenericClass> values =
-        TestStream.create(AvroCoder.of(GenericClass.class))
-            .advanceWatermarkTo(new Instant(0))
-            .addElements(
-                firstWindowArray[0],
-                Arrays.copyOfRange(firstWindowArray, 1, 
firstWindowArray.length))
-            .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardMinutes(1)))
-            .addElements(
-                secondWindowArray[0],
-                Arrays.copyOfRange(secondWindowArray, 1, 
secondWindowArray.length))
-            .advanceWatermarkToInfinity();
-
-    final PTransform<PCollection<GenericClass>, WriteFilesResult<Void>> write;
-    switch (method) {
-      case AVROIO_WRITE:
-        {
-          FilenamePolicy policy =
-              new WindowedFilenamePolicy(
-                  FileBasedSink.convertToFileResourceIfPossible(baseFilename));
-          write =
+    @Test
+    @Category(NeedsRunner.class)
+    public void testContinuouslyWriteAndReadMultipleFilepatterns() throws 
Throwable {
+      SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass();
+      List<GenericClass> firstValues = Lists.newArrayList();
+      List<GenericClass> secondValues = Lists.newArrayList();
+      for (int i = 0; i < 7; ++i) {
+        (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i));
+      }
+      // Configure windowing of the input so that it fires every time a new 
element is generated,
+      // so that files are written continuously.
+      Window<Long> window =
+          Window.<Long>into(FixedWindows.of(Duration.millis(100)))
+              .withAllowedLateness(Duration.ZERO)
+              .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+              .discardingFiredPanes();
+      readPipeline
+          .apply("Sequence first", GenerateSequence.from(0).to(3).withRate(1, 
Duration.millis(300)))
+          .apply("Window first", window)
+          .apply("Map first", MapElements.via(mapFn))
+          .apply(
+              "Write first",
               AvroIO.write(GenericClass.class)
-                  .to(policy)
-                  .withTempDirectory(
-                      StaticValueProvider.of(
-                          FileSystems.matchNewResource(baseDir.toString(), 
true)))
-                  .withWindowedWrites()
+                  .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
                   .withNumShards(2)
-                  .withOutputFilenames();
-          break;
-        }
-
-      case AVROIO_SINK:
-        {
-          write =
-              FileIO.<GenericClass>write()
-                  .via(AvroIO.sink(GenericClass.class))
-                  .to(baseDir.toString())
-                  .withPrefix("prefix")
-                  .withSuffix(".avro")
-                  .withTempDirectory(baseDir.toString())
-                  .withNumShards(2);
-          break;
-        }
-
-      default:
-        throw new UnsupportedOperationException();
+                  .withWindowedWrites());
+      readPipeline
+          .apply(
+              "Sequence second", GenerateSequence.from(3).to(7).withRate(1, 
Duration.millis(300)))
+          .apply("Window second", window)
+          .apply("Map second", MapElements.via(mapFn))
+          .apply(
+              "Write second",
+              AvroIO.write(GenericClass.class)
+                  .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
+                  .withNumShards(3)
+                  .withWindowedWrites());
+
+      // Test read(), readAll(), parse(), and parseAllGenericRecords() with 
watchForNewFiles().
+      PAssert.that(
+              readPipeline.apply(
+                  "Read",
+                  AvroIO.read(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
+                      .watchForNewFiles(
+                          Duration.millis(100),
+                          
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
+          .containsInAnyOrder(firstValues);
+      PAssert.that(
+              readPipeline.apply(
+                  "Parse",
+                  AvroIO.parseGenericRecords(new ParseGenericClass())
+                      .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
+                      .watchForNewFiles(
+                          Duration.millis(100),
+                          
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
+          .containsInAnyOrder(firstValues);
+
+      PCollection<String> paths =
+          readPipeline.apply(
+              "Create paths",
+              Create.of(
+                  tmpFolder.getRoot().getAbsolutePath() + "/first*",
+                  tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+      PAssert.that(
+              paths.apply(
+                  "Read all",
+                  AvroIO.readAll(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .watchForNewFiles(
+                          Duration.millis(100),
+                          
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))
+                      .withDesiredBundleSizeBytes(10)))
+          .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+      PAssert.that(
+              paths.apply(
+                  "Parse all",
+                  AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                      .withCoder(AvroCoder.of(GenericClass.class))
+                      .watchForNewFiles(
+                          Duration.millis(100),
+                          
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))
+                      .withDesiredBundleSizeBytes(10)))
+          .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+      readPipeline.run();
     }
-    windowedAvroWritePipeline
-        .apply(values)
-        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(write);
-    windowedAvroWritePipeline.run();
-
-    // Validate that the data written matches the expected elements in the 
expected order
-    List<File> expectedFiles = new ArrayList<>();
-    for (int shard = 0; shard < 2; shard++) {
-      for (int window = 0; window < 2; window++) {
-        Instant windowStart = new 
Instant(0).plus(Duration.standardMinutes(window));
-        IntervalWindow iw = new IntervalWindow(windowStart, 
Duration.standardMinutes(1));
-        String baseAndWindow = baseFilename + "-" + iw.start() + "-" + 
iw.end();
-        switch (method) {
-          case AVROIO_WRITE:
-            expectedFiles.add(new File(baseAndWindow + "-" + shard + 
"-of-2-pane-0-last.avro"));
-            break;
-          case AVROIO_SINK:
-            expectedFiles.add(new File(baseAndWindow + "-0000" + shard + 
"-of-00002.avro"));
-            break;
-        }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    @Category(NeedsRunner.class)
+    public void testCompressedWriteAndReadASingleFile() throws Throwable {
+      List<GenericClass> values =
+          ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              AvroIO.write(GenericClass.class)
+                  .to(outputFile.getAbsolutePath())
+                  .withoutSharding()
+                  .withCodec(CodecFactory.deflateCodec(9)));
+      writePipeline.run();
+
+      PAssert.that(
+              readPipeline.apply(
+                  AvroIO.read(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .from(outputFile.getAbsolutePath())))
+          .containsInAnyOrder(values);
+      readPipeline.run();
+
+      try (DataFileStream dataFileStream =
+          new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
+        assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
       }
     }
 
-    List<GenericClass> actualElements = new ArrayList<>();
-    for (File outputFile : expectedFiles) {
-      assertTrue("Expected output file " + outputFile.getAbsolutePath(), 
outputFile.exists());
-      try (DataFileReader<GenericClass> reader =
-          new DataFileReader<>(
-              outputFile,
-              new ReflectDatumReader<GenericClass>(
-                  ReflectData.get().getSchema(GenericClass.class)))) {
-        Iterators.addAll(actualElements, reader);
+    @Test
+    @SuppressWarnings("unchecked")
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadASingleFileWithNullCodec() throws Throwable {
+      List<GenericClass> values =
+          ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              AvroIO.write(GenericClass.class)
+                  .to(outputFile.getAbsolutePath())
+                  .withoutSharding()
+                  .withCodec(CodecFactory.nullCodec()));
+      writePipeline.run();
+
+      PAssert.that(
+              readPipeline.apply(
+                  AvroIO.read(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .from(outputFile.getAbsolutePath())))
+          .containsInAnyOrder(values);
+      readPipeline.run();
+
+      try (DataFileStream dataFileStream =
+          new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
+        assertEquals("null", dataFileStream.getMetaString("avro.codec"));
       }
-      outputFile.delete();
     }
-    assertThat(actualElements, containsInAnyOrder(allElements.toArray()));
-  }
 
-  private static final String SCHEMA_TEMPLATE_STRING =
-      "{\"namespace\": \"example.avro\",\n"
-          + " \"type\": \"record\",\n"
-          + " \"name\": \"TestTemplateSchema$$\",\n"
-          + " \"fields\": [\n"
-          + "     {\"name\": \"$$full\", \"type\": \"string\"},\n"
-          + "     {\"name\": \"$$suffix\", \"type\": [\"string\", \"null\"]}\n"
-          + " ]\n"
-          + "}";
-
-  private static String schemaFromPrefix(String prefix) {
-    return SCHEMA_TEMPLATE_STRING.replace("$$", prefix);
-  }
+    @DefaultCoder(AvroCoder.class)
+    static class GenericClassV2 {
+      int intField;
+      String stringField;
+      @Nullable String nullableField;
 
-  private static GenericRecord createRecord(String record, String prefix, 
Schema schema) {
-    GenericRecord genericRecord = new GenericData.Record(schema);
-    genericRecord.put(prefix + "full", record);
-    genericRecord.put(prefix + "suffix", record.substring(1));
-    return genericRecord;
-  }
+      public GenericClassV2() {}
+
+      public GenericClassV2(int intValue, String stringValue, String 
nullableValue) {
+        this.intField = intValue;
+        this.stringField = stringValue;
+        this.nullableField = nullableValue;
+      }
 
-  private static class TestDynamicDestinations
-      extends DynamicAvroDestinations<String, String, GenericRecord> {
-    ResourceId baseDir;
-    PCollectionView<Map<String, String>> schemaView;
+      @Override
+      public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+            .add("intField", intField)
+            .add("stringField", stringField)
+            .add("nullableField", nullableField)
+            .toString();
+      }
 
-    TestDynamicDestinations(ResourceId baseDir, PCollectionView<Map<String, 
String>> schemaView) {
-      this.baseDir = baseDir;
-      this.schemaView = schemaView;
-    }
+      @Override
+      public int hashCode() {
+        return Objects.hash(intField, stringField, nullableField);
+      }
 
-    @Override
-    public Schema getSchema(String destination) {
-      // Return a per-destination schema.
-      String schema = sideInput(schemaView).get(destination);
-      return new Schema.Parser().parse(schema);
+      @Override
+      public boolean equals(Object other) {
+        if (other == null || !(other instanceof GenericClassV2)) {
+          return false;
+        }
+        GenericClassV2 o = (GenericClassV2) other;
+        return Objects.equals(intField, o.intField)
+            && Objects.equals(stringField, o.stringField)
+            && Objects.equals(nullableField, o.nullableField);
+      }
     }
 
-    @Override
-    public List<PCollectionView<?>> getSideInputs() {
-      return ImmutableList.of(schemaView);
+    /**
+     * Tests that {@code AvroIO} can read an upgraded version of an old class, 
as long as the schema
+     * resolution process succeeds. This test covers the case when a new, 
{@code @Nullable} field
+     * has been added.
+     *
+     * <p>For more information, see 
http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
+     */
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadSchemaUpgrade() throws Throwable {
+      List<GenericClass> values =
+          ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              
AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
+      writePipeline.run();
+
+      List<GenericClassV2> expected =
+          ImmutableList.of(new GenericClassV2(3, "hi", null), new 
GenericClassV2(5, "bar", null));
+
+      PAssert.that(
+              readPipeline.apply(
+                  AvroIO.read(GenericClassV2.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      .from(outputFile.getAbsolutePath())))
+          .containsInAnyOrder(expected);
+      readPipeline.run();
     }
 
-    @Override
-    public GenericRecord formatRecord(String record) {
-      String prefix = record.substring(0, 1);
-      return createRecord(record, prefix, getSchema(prefix));
-    }
+    private static class WindowedFilenamePolicy extends FilenamePolicy {
+      final ResourceId outputFilePrefix;
 
-    @Override
-    public String getDestination(String element) {
-      // Destination is based on first character of string.
-      return element.substring(0, 1);
-    }
+      WindowedFilenamePolicy(ResourceId outputFilePrefix) {
+        this.outputFilePrefix = outputFilePrefix;
+      }
+
+      @Override
+      public ResourceId windowedFilename(
+          int shardNumber,
+          int numShards,
+          BoundedWindow window,
+          PaneInfo paneInfo,
+          OutputFileHints outputFileHints) {
+        String filenamePrefix =
+            outputFilePrefix.isDirectory() ? "" : 
firstNonNull(outputFilePrefix.getFilename(), "");
+
+        IntervalWindow interval = (IntervalWindow) window;
+        String windowStr =
+            String.format("%s-%s", interval.start().toString(), 
interval.end().toString());
+        String filename =
+            String.format(
+                "%s-%s-%s-of-%s-pane-%s%s%s.avro",
+                filenamePrefix,
+                windowStr,
+                shardNumber,
+                numShards,
+                paneInfo.getIndex(),
+                paneInfo.isLast() ? "-last" : "",
+                outputFileHints.getSuggestedFilenameSuffix());
+        return outputFilePrefix.getCurrentDirectory().resolve(filename, 
RESOLVE_FILE);
+      }
+
+      @Override
+      public ResourceId unwindowedFilename(
+          int shardNumber, int numShards, OutputFileHints outputFileHints) {
+        throw new UnsupportedOperationException("Expecting windowed outputs 
only");
+      }
 
-    @Override
-    public String getDefaultDestination() {
-      return "";
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(
+            DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
+                .withLabel("File Name Prefix"));
+      }
     }
 
-    @Override
-    public FilenamePolicy getFilenamePolicy(String destination) {
-      return DefaultFilenamePolicy.fromStandardParameters(
-          StaticValueProvider.of(baseDir.resolve("file_" + destination, 
RESOLVE_FILE)),
-          "-SSSSS-of-NNNNN",
-          ".avro",
-          false);
+    @Test
+    @Category({NeedsRunner.class, UsesTestStream.class})
+    public void testWriteWindowed() throws Throwable {
+      testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_WRITE);
     }
-  }
 
-  private void testDynamicDestinationsUnwindowedWithSharding(
-      WriteMethod writeMethod, Sharding sharding) throws Exception {
-    final ResourceId baseDir =
-        FileSystems.matchNewResource(
-            Files.createTempDirectory(tmpFolder.getRoot().toPath(), 
"testDynamicDestinations")
-                .toString(),
-            true);
-
-    List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", 
"caaa", "caab");
-    Multimap<String, GenericRecord> expectedElements = 
ArrayListMultimap.create();
-    Map<String, String> schemaMap = Maps.newHashMap();
-    for (String element : elements) {
-      String prefix = element.substring(0, 1);
-      String jsonSchema = schemaFromPrefix(prefix);
-      schemaMap.put(prefix, jsonSchema);
-      expectedElements.put(
-          prefix, createRecord(element, prefix, new 
Schema.Parser().parse(jsonSchema)));
+    @Test
+    @Category({NeedsRunner.class, UsesTestStream.class})
+    public void testWindowedAvroIOWriteViaSink() throws Throwable {
+      testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_SINK);
     }
-    final PCollectionView<Map<String, String>> schemaView =
-        writePipeline.apply("createSchemaView", 
Create.of(schemaMap)).apply(View.asMap());
-
-    PCollection<String> input =
-        writePipeline.apply("createInput", 
Create.of(elements).withCoder(StringUtf8Coder.of()));
-
-    switch (writeMethod) {
-      case AVROIO_WRITE:
-        {
-          AvroIO.TypedWrite<String, String, GenericRecord> write =
-              AvroIO.<String>writeCustomTypeToGenericRecords()
-                  .to(new TestDynamicDestinations(baseDir, schemaView))
-                  .withTempDirectory(baseDir);
-
-          switch (sharding) {
-            case RUNNER_DETERMINED:
-              break;
-            case WITHOUT_SHARDING:
-              write = write.withoutSharding();
-              break;
-            case FIXED_3_SHARDS:
-              write = write.withNumShards(3);
-              break;
-            default:
-              throw new IllegalArgumentException("Unknown sharding " + 
sharding);
+
+    void testWindowedAvroIOWriteUsingMethod(WriteMethod method) throws 
IOException {
+      Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), 
"testwrite");
+      final String baseFilename = baseDir.resolve("prefix").toString();
+
+      Instant base = new Instant(0);
+      ArrayList<GenericClass> allElements = new ArrayList<>();
+      ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new 
ArrayList<>();
+      ArrayList<Instant> firstWindowTimestamps =
+          Lists.newArrayList(
+              base.plus(Duration.ZERO), 
base.plus(Duration.standardSeconds(10)),
+              base.plus(Duration.standardSeconds(20)), 
base.plus(Duration.standardSeconds(30)));
+
+      Random random = new Random();
+      for (int i = 0; i < 100; ++i) {
+        GenericClass item = new GenericClass(i, String.valueOf(i));
+        allElements.add(item);
+        firstWindowElements.add(
+            TimestampedValue.of(
+                item, 
firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
+      }
+
+      ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new 
ArrayList<>();
+      ArrayList<Instant> secondWindowTimestamps =
+          Lists.newArrayList(
+              base.plus(Duration.standardSeconds(60)), 
base.plus(Duration.standardSeconds(70)),
+              base.plus(Duration.standardSeconds(80)), 
base.plus(Duration.standardSeconds(90)));
+      for (int i = 100; i < 200; ++i) {
+        GenericClass item = new GenericClass(i, String.valueOf(i));
+        allElements.add(new GenericClass(i, String.valueOf(i)));
+        secondWindowElements.add(
+            TimestampedValue.of(
+                item, 
secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
+      }
+
+      TimestampedValue<GenericClass>[] firstWindowArray =
+          firstWindowElements.toArray(new TimestampedValue[100]);
+      TimestampedValue<GenericClass>[] secondWindowArray =
+          secondWindowElements.toArray(new TimestampedValue[100]);
+
+      TestStream<GenericClass> values =
+          TestStream.create(AvroCoder.of(GenericClass.class))
+              .advanceWatermarkTo(new Instant(0))
+              .addElements(
+                  firstWindowArray[0],
+                  Arrays.copyOfRange(firstWindowArray, 1, 
firstWindowArray.length))
+              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardMinutes(1)))
+              .addElements(
+                  secondWindowArray[0],
+                  Arrays.copyOfRange(secondWindowArray, 1, 
secondWindowArray.length))
+              .advanceWatermarkToInfinity();
+
+      final PTransform<PCollection<GenericClass>, WriteFilesResult<Void>> 
write;
+      switch (method) {
+        case AVROIO_WRITE:
+          {
+            FilenamePolicy policy =
+                new WindowedFilenamePolicy(
+                    
FileBasedSink.convertToFileResourceIfPossible(baseFilename));
+            write =
+                AvroIO.write(GenericClass.class)
+                    .to(policy)
+                    .withTempDirectory(
+                        StaticValueProvider.of(
+                            FileSystems.matchNewResource(baseDir.toString(), 
true)))
+                    .withWindowedWrites()
+                    .withNumShards(2)
+                    .withOutputFilenames();
+            break;
           }
 
-          input.apply(write);
-          break;
-        }
+        case AVROIO_SINK:
+          {
+            write =
+                FileIO.<GenericClass>write()
+                    .via(AvroIO.sink(GenericClass.class))
+                    .to(baseDir.toString())
+                    .withPrefix("prefix")
+                    .withSuffix(".avro")
+                    .withTempDirectory(baseDir.toString())
+                    .withNumShards(2);
+            break;
+          }
 
-      case AVROIO_SINK:
-        {
-          final AvroIO.RecordFormatter<String> formatter =
-              (element, schema) -> {
-                String prefix = element.substring(0, 1);
-                GenericRecord record = new GenericData.Record(schema);
-                record.put(prefix + "full", element);
-                record.put(prefix + "suffix", element.substring(1));
-                return record;
-              };
-          FileIO.Write<String, String> write =
-              FileIO.<String, String>writeDynamic()
-                  .by(
-                      fn(
-                          (element, c) -> {
-                            c.sideInput(schemaView); // Ignore result
-                            return element.substring(0, 1);
-                          },
-                          requiresSideInputs(schemaView)))
-                  .via(
-                      fn(
-                          (dest, c) -> {
-                            Schema schema =
-                                new 
Schema.Parser().parse(c.sideInput(schemaView).get(dest));
-                            return AvroIO.sinkViaGenericRecords(schema, 
formatter);
-                          },
-                          requiresSideInputs(schemaView)))
-                  .to(baseDir.toString())
-                  .withNaming(
-                      fn(
-                          (dest, c) -> {
-                            c.sideInput(schemaView); // Ignore result
-                            return FileIO.Write.defaultNaming("file_" + dest, 
".avro");
-                          },
-                          requiresSideInputs(schemaView)))
-                  .withTempDirectory(baseDir.toString())
-                  .withDestinationCoder(StringUtf8Coder.of())
-                  .withIgnoreWindowing();
-          switch (sharding) {
-            case RUNNER_DETERMINED:
-              break;
-            case WITHOUT_SHARDING:
-              write = write.withNumShards(1);
+        default:
+          throw new UnsupportedOperationException();
+      }
+      windowedAvroWritePipeline
+          .apply(values)
+          .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
+          .apply(write);
+      windowedAvroWritePipeline.run();
+
+      // Validate that the data written matches the expected elements in the 
expected order
+      List<File> expectedFiles = new ArrayList<>();
+      for (int shard = 0; shard < 2; shard++) {
+        for (int window = 0; window < 2; window++) {
+          Instant windowStart = new 
Instant(0).plus(Duration.standardMinutes(window));
+          IntervalWindow iw = new IntervalWindow(windowStart, 
Duration.standardMinutes(1));
+          String baseAndWindow = baseFilename + "-" + iw.start() + "-" + 
iw.end();
+          switch (method) {
+            case AVROIO_WRITE:
+              expectedFiles.add(new File(baseAndWindow + "-" + shard + 
"-of-2-pane-0-last.avro"));
               break;
-            case FIXED_3_SHARDS:
-              write = write.withNumShards(3);
+            case AVROIO_SINK:
+              expectedFiles.add(new File(baseAndWindow + "-0000" + shard + 
"-of-00002.avro"));
               break;
-            default:
-              throw new IllegalArgumentException("Unknown sharding " + 
sharding);
           }
+        }
+      }
 
-          input.apply(write);
-          break;
+      List<GenericClass> actualElements = new ArrayList<>();
+      for (File outputFile : expectedFiles) {
+        assertTrue("Expected output file " + outputFile.getAbsolutePath(), 
outputFile.exists());
+        try (DataFileReader<GenericClass> reader =
+            new DataFileReader<>(
+                outputFile,
+                new ReflectDatumReader<GenericClass>(
+                    ReflectData.get().getSchema(GenericClass.class)))) {
+          Iterators.addAll(actualElements, reader);
         }
+        outputFile.delete();
+      }
+      assertThat(actualElements, containsInAnyOrder(allElements.toArray()));
     }
 
-    writePipeline.run();
-
-    // Validate that the data written matches the expected elements in the 
expected order.
-
-    for (String prefix : expectedElements.keySet()) {
-      String shardPattern;
-      switch (sharding) {
-        case RUNNER_DETERMINED:
-          shardPattern = "-*";
-          break;
-        case WITHOUT_SHARDING:
-          shardPattern = "-00000-of-00001";
-          break;
-        case FIXED_3_SHARDS:
-          shardPattern = "-*-of-00003";
-          break;
-        default:
-          throw new IllegalArgumentException("Unknown sharding " + sharding);
-      }
-      String expectedFilepattern =
-          baseDir.resolve("file_" + prefix + shardPattern + ".avro", 
RESOLVE_FILE).toString();
+    private static final String SCHEMA_TEMPLATE_STRING =
+        "{\"namespace\": \"example.avro\",\n"
+            + " \"type\": \"record\",\n"
+            + " \"name\": \"TestTemplateSchema$$\",\n"
+            + " \"fields\": [\n"
+            + "     {\"name\": \"$$full\", \"type\": \"string\"},\n"
+            + "     {\"name\": \"$$suffix\", \"type\": [\"string\", 
\"null\"]}\n"
+            + " ]\n"
+            + "}";
+
+    private static String schemaFromPrefix(String prefix) {
+      return SCHEMA_TEMPLATE_STRING.replace("$$", prefix);
+    }
 
-      PCollection<GenericRecord> records =
-          readPipeline.apply(
-              "read_" + prefix,
-              
AvroIO.readGenericRecords(schemaFromPrefix(prefix)).from(expectedFilepattern));
-      PAssert.that(records).containsInAnyOrder(expectedElements.get(prefix));
+    private static GenericRecord createRecord(String record, String prefix, 
Schema schema) {
+      GenericRecord genericRecord = new GenericData.Record(schema);
+      genericRecord.put(prefix + "full", record);
+      genericRecord.put(prefix + "suffix", record.substring(1));
+      return genericRecord;
     }
-    readPipeline.run();
-  }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinationsRunnerDeterminedSharding() throws 
Exception {
-    testDynamicDestinationsUnwindowedWithSharding(
-        WriteMethod.AVROIO_WRITE, Sharding.RUNNER_DETERMINED);
-  }
+    private static class TestDynamicDestinations
+        extends DynamicAvroDestinations<String, String, GenericRecord> {
+      ResourceId baseDir;
+      PCollectionView<Map<String, String>> schemaView;
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinationsWithoutSharding() throws Exception {
-    testDynamicDestinationsUnwindowedWithSharding(
-        WriteMethod.AVROIO_WRITE, Sharding.WITHOUT_SHARDING);
-  }
+      TestDynamicDestinations(ResourceId baseDir, PCollectionView<Map<String, 
String>> schemaView) {
+        this.baseDir = baseDir;
+        this.schemaView = schemaView;
+      }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinationsWithNumShards() throws Exception {
-    testDynamicDestinationsUnwindowedWithSharding(
-        WriteMethod.AVROIO_WRITE, Sharding.FIXED_3_SHARDS);
-  }
+      @Override
+      public Schema getSchema(String destination) {
+        // Return a per-destination schema.
+        String schema = sideInput(schemaView).get(destination);
+        return new Schema.Parser().parse(schema);
+      }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinationsViaSinkRunnerDeterminedSharding() throws 
Exception {
-    testDynamicDestinationsUnwindowedWithSharding(
-        WriteMethod.AVROIO_SINK, Sharding.RUNNER_DETERMINED);
-  }
+      @Override
+      public List<PCollectionView<?>> getSideInputs() {
+        return ImmutableList.of(schemaView);
+      }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinationsViaSinkWithoutSharding() throws Exception 
{
-    testDynamicDestinationsUnwindowedWithSharding(
-        WriteMethod.AVROIO_SINK, Sharding.WITHOUT_SHARDING);
-  }
+      @Override
+      public GenericRecord formatRecord(String record) {
+        String prefix = record.substring(0, 1);
+        return createRecord(record, prefix, getSchema(prefix));
+      }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinationsViaSinkWithNumShards() throws Exception {
-    testDynamicDestinationsUnwindowedWithSharding(WriteMethod.AVROIO_SINK, 
Sharding.FIXED_3_SHARDS);
-  }
+      @Override
+      public String getDestination(String element) {
+        // Destination is based on first character of string.
+        return element.substring(0, 1);
+      }
 
-  @Test
-  public void testWriteWithDefaultCodec() throws Exception {
-    AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz");
-    assertEquals(CodecFactory.snappyCodec().toString(), 
write.inner.getCodec().toString());
-  }
+      @Override
+      public String getDefaultDestination() {
+        return "";
+      }
 
-  @Test
-  public void testWriteWithCustomCodec() throws Exception {
-    AvroIO.Write<String> write =
-        
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
-    assertEquals(SNAPPY_CODEC, write.inner.getCodec().toString());
-  }
+      @Override
+      public FilenamePolicy getFilenamePolicy(String destination) {
+        return DefaultFilenamePolicy.fromStandardParameters(
+            StaticValueProvider.of(baseDir.resolve("file_" + destination, 
RESOLVE_FILE)),
+            "-SSSSS-of-NNNNN",
+            ".avro",
+            false);
+      }
+    }
 
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
-    AvroIO.Write<String> write =
-        
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.deflateCodec(9));
+    private void testDynamicDestinationsUnwindowedWithSharding(
+        WriteMethod writeMethod, Sharding sharding) throws Exception {
+      final ResourceId baseDir =
+          FileSystems.matchNewResource(
+              Files.createTempDirectory(tmpFolder.getRoot().toPath(), 
"testDynamicDestinations")
+                  .toString(),
+              true);
+
+      List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", 
"baab", "caaa", "caab");
+      Multimap<String, GenericRecord> expectedElements = 
ArrayListMultimap.create();
+      Map<String, String> schemaMap = Maps.newHashMap();
+      for (String element : elements) {
+        String prefix = element.substring(0, 1);
+        String jsonSchema = schemaFromPrefix(prefix);
+        schemaMap.put(prefix, jsonSchema);
+        expectedElements.put(
+            prefix, createRecord(element, prefix, new 
Schema.Parser().parse(jsonSchema)));
+      }
+      final PCollectionView<Map<String, String>> schemaView =
+          writePipeline.apply("createSchemaView", 
Create.of(schemaMap)).apply(View.asMap());
+
+      PCollection<String> input =
+          writePipeline.apply("createInput", 
Create.of(elements).withCoder(StringUtf8Coder.of()));
+
+      switch (writeMethod) {
+        case AVROIO_WRITE:
+          {
+            AvroIO.TypedWrite<String, String, GenericRecord> write =
+                AvroIO.<String>writeCustomTypeToGenericRecords()
+                    .to(new TestDynamicDestinations(baseDir, schemaView))
+                    .withTempDirectory(baseDir);
+
+            switch (sharding) {
+              case RUNNER_DETERMINED:
+                break;
+              case WITHOUT_SHARDING:
+                write = write.withoutSharding();
+                break;
+              case FIXED_3_SHARDS:
+                write = write.withNumShards(3);
+                break;
+              default:
+                throw new IllegalArgumentException("Unknown sharding " + 
sharding);
+            }
+
+            input.apply(write);
+            break;
+          }
 
-    assertEquals(
-        CodecFactory.deflateCodec(9).toString(),
-        SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
-  }
+        case AVROIO_SINK:
+          {
+            final AvroIO.RecordFormatter<String> formatter =
+                (element, schema) -> {
+                  String prefix = element.substring(0, 1);
+                  GenericRecord record = new GenericData.Record(schema);
+                  record.put(prefix + "full", element);
+                  record.put(prefix + "suffix", element.substring(1));
+                  return record;
+                };
+            FileIO.Write<String, String> write =
+                FileIO.<String, String>writeDynamic()
+                    .by(
+                        fn(
+                            (element, c) -> {
+                              c.sideInput(schemaView); // Ignore result
+                              return element.substring(0, 1);
+                            },
+                            requiresSideInputs(schemaView)))
+                    .via(
+                        fn(
+                            (dest, c) -> {
+                              Schema schema =
+                                  new 
Schema.Parser().parse(c.sideInput(schemaView).get(dest));
+                              return AvroIO.sinkViaGenericRecords(schema, 
formatter);
+                            },
+                            requiresSideInputs(schemaView)))
+                    .to(baseDir.toString())
+                    .withNaming(
+                        fn(
+                            (dest, c) -> {
+                              c.sideInput(schemaView); // Ignore result
+                              return FileIO.Write.defaultNaming("file_" + 
dest, ".avro");
+                            },
+                            requiresSideInputs(schemaView)))
+                    .withTempDirectory(baseDir.toString())
+                    .withDestinationCoder(StringUtf8Coder.of())
+                    .withIgnoreWindowing();
+            switch (sharding) {
+              case RUNNER_DETERMINED:
+                break;
+              case WITHOUT_SHARDING:
+                write = write.withNumShards(1);
+                break;
+              case FIXED_3_SHARDS:
+                write = write.withNumShards(3);
+                break;
+              default:
+                throw new IllegalArgumentException("Unknown sharding " + 
sharding);
+            }
+
+            input.apply(write);
+            break;
+          }
+      }
 
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testWriteWithSerDeCustomXZCodec() throws Exception {
-    AvroIO.Write<String> write =
-        
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.xzCodec(9));
+      writePipeline.run();
 
-    assertEquals(
-        CodecFactory.xzCodec(9).toString(),
-        SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
-  }
+      // Validate that the data written matches the expected elements in the 
expected order.
 
-  @Test
-  @SuppressWarnings("unchecked")
-  @Category(NeedsRunner.class)
-  public void testMetadata() throws Exception {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            AvroIO.write(GenericClass.class)
-                .to(outputFile.getAbsolutePath())
-                .withoutSharding()
-                .withMetadata(
-                    ImmutableMap.of(
-                        "stringKey",
-                        "stringValue",
-                        "longKey",
-                        100L,
-                        "bytesKey",
-                        "bytesValue".getBytes(Charsets.UTF_8))));
-    writePipeline.run();
-
-    try (DataFileStream dataFileStream =
-        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
-      assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
-      assertEquals(100L, dataFileStream.getMetaLong("longKey"));
-      assertArrayEquals("bytesValue".getBytes(Charsets.UTF_8), 
dataFileStream.getMeta("bytesKey"));
+      for (String prefix : expectedElements.keySet()) {
+        String shardPattern;
+        switch (sharding) {
+          case RUNNER_DETERMINED:
+            shardPattern = "-*";
+            break;
+          case WITHOUT_SHARDING:
+            shardPattern = "-00000-of-00001";
+            break;
+          case FIXED_3_SHARDS:
+            shardPattern = "-*-of-00003";
+            break;
+          default:
+            throw new IllegalArgumentException("Unknown sharding " + sharding);
+        }
+        String expectedFilepattern =
+            baseDir.resolve("file_" + prefix + shardPattern + ".avro", 
RESOLVE_FILE).toString();
+
+        PCollection<GenericRecord> records =
+            readPipeline.apply(
+                "read_" + prefix,
+                AvroIO.readGenericRecords(schemaFromPrefix(prefix))
+                    .withBeamSchemas(withBeamSchemas)
+                    .from(expectedFilepattern));
+        PAssert.that(records).containsInAnyOrder(expectedElements.get(prefix));
+      }
+      readPipeline.run();
     }
-  }
 
-  @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for 
tests.
-  private void runTestWrite(String[] expectedElements, int numShards) throws 
IOException {
-    File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
-    String outputFilePrefix = baseOutputFile.getAbsolutePath();
-
-    AvroIO.Write<String> write =
-        AvroIO.write(String.class).to(outputFilePrefix).withSuffix(".avro");
-    if (numShards > 1) {
-      write = write.withNumShards(numShards);
-    } else {
-      write = write.withoutSharding();
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsRunnerDeterminedSharding() throws 
Exception {
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_WRITE, Sharding.RUNNER_DETERMINED);
     }
-    
writePipeline.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
-    writePipeline.run();
 
-    String shardNameTemplate =
-        firstNonNull(
-            write.inner.getShardTemplate(),
-            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsWithoutSharding() throws Exception {
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_WRITE, Sharding.WITHOUT_SHARDING);
+    }
 
-    assertTestOutputs(expectedElements, numShards, outputFilePrefix, 
shardNameTemplate);
-  }
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsWithNumShards() throws Exception {
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_WRITE, Sharding.FIXED_3_SHARDS);
+    }
 
-  public static void assertTestOutputs(
-      String[] expectedElements, int numShards, String outputFilePrefix, 
String shardNameTemplate)
-      throws IOException {
-    // Validate that the data written matches the expected elements in the 
expected order
-    List<File> expectedFiles = new ArrayList<>();
-    for (int i = 0; i < numShards; i++) {
-      expectedFiles.add(
-          new File(
-              DefaultFilenamePolicy.constructName(
-                      
FileBasedSink.convertToFileResourceIfPossible(outputFilePrefix),
-                      shardNameTemplate,
-                      ".avro",
-                      i,
-                      numShards,
-                      null,
-                      null)
-                  .toString()));
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsViaSinkRunnerDeterminedSharding() 
throws Exception {
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_SINK, Sharding.RUNNER_DETERMINED);
     }
 
-    List<String> actualElements = new ArrayList<>();
-    for (File outputFile : expectedFiles) {
-      assertTrue("Expected output file " + outputFile.getName(), 
outputFile.exists());
-      try (DataFileReader<String> reader =
-          new DataFileReader<>(
-              outputFile, new 
ReflectDatumReader(ReflectData.get().getSchema(String.class)))) {
-        Iterators.addAll(actualElements, reader);
-      }
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsViaSinkWithoutSharding() throws 
Exception {
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_SINK, Sharding.WITHOUT_SHARDING);
     }
-    assertThat(actualElements, containsInAnyOrder(expectedElements));
-  }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testAvroSinkWrite() throws Exception {
-    String[] expectedElements = new String[] {"first", "second", "third"};
+    @Test
+    @Category(NeedsRunner.class)
+    public void testDynamicDestinationsViaSinkWithNumShards() throws Exception 
{
+      testDynamicDestinationsUnwindowedWithSharding(
+          WriteMethod.AVROIO_SINK, Sharding.FIXED_3_SHARDS);
+    }
 
-    runTestWrite(expectedElements, 1);
-  }
+    @Test
+    @SuppressWarnings("unchecked")
+    @Category(NeedsRunner.class)
+    public void testMetadata() throws Exception {
+      List<GenericClass> values =
+          ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              AvroIO.write(GenericClass.class)
+                  .to(outputFile.getAbsolutePath())
+                  .withoutSharding()
+                  .withMetadata(
+                      ImmutableMap.of(
+                          "stringKey",
+                          "stringValue",
+                          "longKey",
+                          100L,
+                          "bytesKey",
+                          "bytesValue".getBytes(Charsets.UTF_8))));
+      writePipeline.run();
+
+      try (DataFileStream dataFileStream =
+          new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
+        assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
+        assertEquals(100L, dataFileStream.getMetaLong("longKey"));
+        assertArrayEquals(
+            "bytesValue".getBytes(Charsets.UTF_8), 
dataFileStream.getMeta("bytesKey"));
+      }
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testAvroSinkShardedWrite() throws Exception {
-    String[] expectedElements = new String[] {"first", "second", "third", 
"fourth", "fifth"};
+    @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for 
tests.
+    private void runTestWrite(String[] expectedElements, int numShards) throws 
IOException {
+      File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
+      String outputFilePrefix = baseOutputFile.getAbsolutePath();
+
+      AvroIO.Write<String> write =
+          AvroIO.write(String.class).to(outputFilePrefix).withSuffix(".avro");
+      if (numShards > 1) {
+        write = write.withNumShards(numShards);
+      } else {
+        write = write.withoutSharding();
+      }
+      
writePipeline.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
+      writePipeline.run();
 
-    runTestWrite(expectedElements, 4);
-  }
-  // TODO: for Write only, test withSuffix,
-  // withShardNameTemplate and withoutSharding.
+      String shardNameTemplate =
+          firstNonNull(
+              write.inner.getShardTemplate(),
+              DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
 
-  @Test
-  public void testReadDisplayData() {
-    AvroIO.Read<String> read = AvroIO.read(String.class).from("/foo.*");
+      assertTestOutputs(expectedElements, numShards, outputFilePrefix, 
shardNameTemplate);
+    }
 
-    DisplayData displayData = DisplayData.from(read);
-    assertThat(displayData, hasDisplayItem("filePattern", "/foo.*"));
-  }
+    public static void assertTestOutputs(
+        String[] expectedElements, int numShards, String outputFilePrefix, 
String shardNameTemplate)
+        throws IOException {
+      // Validate that the data written matches the expected elements in the 
expected order
+      List<File> expectedFiles = new ArrayList<>();
+      for (int i = 0; i < numShards; i++) {
+        expectedFiles.add(
+            new File(
+                DefaultFilenamePolicy.constructName(
+                        
FileBasedSink.convertToFileResourceIfPossible(outputFilePrefix),
+                        shardNameTemplate,
+                        ".avro",
+                        i,
+                        numShards,
+                        null,
+                        null)
+                    .toString()));
+      }
+
+      List<String> actualElements = new ArrayList<>();
+      for (File outputFile : expectedFiles) {
+        assertTrue("Expected output file " + outputFile.getName(), 
outputFile.exists());
+        try (DataFileReader<String> reader =
+            new DataFileReader<>(
+                outputFile, new 
ReflectDatumReader(ReflectData.get().getSchema(String.class)))) {
+          Iterators.addAll(actualElements, reader);
+        }
+      }
+      assertThat(actualElements, containsInAnyOrder(expectedElements));
+    }
 
-  @Test
-  @Category(ValidatesRunner.class)
-  public void testPrimitiveReadDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    @Test
+    @Category(NeedsRunner.class)
+    public void testAvroSinkWrite() throws Exception {
+      String[] expectedElements = new String[] {"first", "second", "third"};
 
-    AvroIO.Read<GenericRecord> read =
-        
AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*");
+      runTestWrite(expectedElements, 1);
+    }
 
-    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat(
-        "AvroIO.Read should include the file pattern in its primitive 
transform",
-        displayData,
-        hasItem(hasDisplayItem("filePattern")));
-  }
+    @Test
+    @Category(NeedsRunner.class)
+    public void testAvroSinkShardedWrite() throws Exception {
+      String[] expectedElements = new String[] {"first", "second", "third", 
"fourth", "fifth"};
 
-  @Test
-  public void testWriteDisplayData() {
-    AvroIO.Write<GenericClass> write =
-        AvroIO.write(GenericClass.class)
-            .to("/foo")
-            .withShardNameTemplate("-SS-of-NN-")
-            .withSuffix("bar")
-            .withNumShards(100)
-            .withCodec(CodecFactory.deflateCodec(6));
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
-    assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
-    assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
-    assertThat(
-        displayData,
-        hasDisplayItem(
-            "schema",
-            
"{\"type\":\"record\",\"name\":\"GenericClass\",\"namespace\":\"org.apache.beam.sdk.io"
-                + 
".AvroIOTest$\",\"fields\":[{\"name\":\"intField\",\"type\":\"int\"},"
-                + "{\"name\":\"stringField\",\"type\":\"string\"}]}"));
-    assertThat(displayData, hasDisplayItem("numShards", 100));
-    assertThat(displayData, hasDisplayItem("codec", 
CodecFactory.deflateCodec(6).toString()));
+      runTestWrite(expectedElements, 4);
+    }
+    // TODO: for Write only, test withSuffix,
+    // withShardNameTemplate and withoutSharding.
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testPrimitiveReadDisplayData() {
+      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+      AvroIO.Read<GenericRecord> read =
+          AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING))
+              .withBeamSchemas(withBeamSchemas)
+              .from("/foo.*");
+
+      Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
+      assertThat(
+          "AvroIO.Read should include the file pattern in its primitive 
transform",
+          displayData,
+          hasItem(hasDisplayItem("filePattern")));
+    }
   }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
index 3c22f4b62cf1..5e7feb195d32 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
@@ -290,7 +290,7 @@ public void testSpecificRecordSchema() {
 
   @Test
   public void testPojoSchema() {
-    assertEquals(POJO_SCHEMA, AvroUtils.getSchema(AvroPojo.class));
+    assertEquals(POJO_SCHEMA, new 
AvroRecordSchema().schemaFor(TypeDescriptor.of(AvroPojo.class)));
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index e39b44f95bd1..bcd2fa9b66eb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -19,7 +19,9 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.api.client.util.Clock;
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Message;
@@ -33,7 +35,11 @@
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import javax.naming.SizeLimitExceededException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -46,6 +52,8 @@
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -58,6 +66,7 @@
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -429,7 +438,10 @@ public String toString() {
 
   /** Returns A {@link PTransform} that continuously reads from a Google Cloud 
Pub/Sub stream. */
   private static <T> Read<T> read() {
-    return new 
AutoValue_PubsubIO_Read.Builder<T>().setNeedsAttributes(false).build();
+    return new AutoValue_PubsubIO_Read.Builder<T>()
+        .setNeedsAttributes(false)
+        .setPubsubClientFactory(FACTORY)
+        .build();
   }
 
   /**
@@ -439,6 +451,7 @@ public String toString() {
    */
   public static Read<PubsubMessage> readMessages() {
     return new AutoValue_PubsubIO_Read.Builder<PubsubMessage>()
+        .setPubsubClientFactory(FACTORY)
         .setCoder(PubsubMessagePayloadOnlyCoder.of())
         .setParseFn(new IdentityMessageFn())
         .setNeedsAttributes(false)
@@ -452,6 +465,7 @@ public String toString() {
    */
   public static Read<PubsubMessage> readMessagesWithAttributes() {
     return new AutoValue_PubsubIO_Read.Builder<PubsubMessage>()
+        .setPubsubClientFactory(FACTORY)
         .setCoder(PubsubMessageWithAttributesCoder.of())
         .setParseFn(new IdentityMessageFn())
         .setNeedsAttributes(true)
@@ -463,8 +477,12 @@ public String toString() {
    * Pub/Sub stream.
    */
   public static Read<String> readStrings() {
-    return PubsubIO.<String>read()
-        .withCoderAndParseFn(StringUtf8Coder.of(), new ParsePayloadAsUtf8());
+    return new AutoValue_PubsubIO_Read.Builder<String>()
+        .setNeedsAttributes(false)
+        .setPubsubClientFactory(FACTORY)
+        .setCoder(StringUtf8Coder.of())
+        .setParseFn(new ParsePayloadAsUtf8())
+        .build();
   }
 
   /**
@@ -476,7 +494,12 @@ public String toString() {
     // We should not be relying on the fact that ProtoCoder's wire format is 
identical to
     // the protobuf wire format, as the wire format is not part of a coder's 
API.
     ProtoCoder<T> coder = ProtoCoder.of(messageClass);
-    return PubsubIO.<T>read().withCoderAndParseFn(coder, new 
ParsePayloadUsingCoder<>(coder));
+    return new AutoValue_PubsubIO_Read.Builder<T>()
+        .setNeedsAttributes(false)
+        .setPubsubClientFactory(FACTORY)
+        .setCoder(coder)
+        .setParseFn(new ParsePayloadUsingCoder<>(coder))
+        .build();
   }
 
   /**
@@ -488,7 +511,58 @@ public String toString() {
     // We should not be relying on the fact that AvroCoder's wire format is 
identical to
     // the Avro wire format, as the wire format is not part of a coder's API.
     AvroCoder<T> coder = AvroCoder.of(clazz);
-    return PubsubIO.<T>read().withCoderAndParseFn(coder, new 
ParsePayloadUsingCoder<>(coder));
+    return new AutoValue_PubsubIO_Read.Builder<T>()
+        .setNeedsAttributes(false)
+        .setPubsubClientFactory(FACTORY)
+        .setCoder(coder)
+        .setParseFn(new ParsePayloadUsingCoder<>(coder))
+        .build();
+  }
+
+  /**
+   * Returns a {@link PTransform} that continuously reads binary encoded Avro 
messages into the Avro
+   * {@link GenericRecord} type.
+   *
+   * <p>Beam will infer a schema for the Avro schema. This allows the output 
to be used by SQL and
+   * by the schema-transform library.
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static Read<GenericRecord> 
readAvroGenericRecords(org.apache.avro.Schema avroSchema) {
+    Schema schema = AvroUtils.getSchema(GenericRecord.class, avroSchema);
+    AvroCoder<GenericRecord> coder = AvroCoder.of(GenericRecord.class, 
avroSchema);
+    return new AutoValue_PubsubIO_Read.Builder<GenericRecord>()
+        .setNeedsAttributes(false)
+        .setPubsubClientFactory(FACTORY)
+        .setBeamSchema(schema)
+        .setToRowFn(AvroUtils.getToRowFunction(GenericRecord.class, 
avroSchema))
+        .setFromRowFn(AvroUtils.getFromRowFunction(GenericRecord.class))
+        .setParseFn(new ParsePayloadUsingCoder<>(coder))
+        .build();
+  }
+
+  /**
+   * Returns a {@link PTransform} that continuously reads binary encoded Avro 
messages of the
+   * specific type.
+   *
+   * <p>Beam will infer a schema for the Avro schema. This allows the output 
to be used by SQL and
+   * by the schema-transform library.
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static <T> Read<T> readAvrosWithBeamSchema(Class<T> clazz) {
+    if (clazz.equals(GenericRecord.class)) {
+      throw new IllegalArgumentException("For GenericRecord, please call 
readAvroGenericRecords");
+    }
+    org.apache.avro.Schema avroSchema = ReflectData.get().getSchema(clazz);
+    AvroCoder<T> coder = AvroCoder.of(clazz);
+    Schema schema = AvroUtils.getSchema(clazz, null);
+    return new AutoValue_PubsubIO_Read.Builder<T>()
+        .setNeedsAttributes(false)
+        .setPubsubClientFactory(FACTORY)
+        .setBeamSchema(schema)
+        .setToRowFn(AvroUtils.getToRowFunction(clazz, avroSchema))
+        .setFromRowFn(AvroUtils.getFromRowFunction(clazz))
+        .setParseFn(new ParsePayloadUsingCoder<>(coder))
+        .build();
   }
 
   /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub 
stream. */
@@ -553,6 +627,20 @@ public String toString() {
     @Nullable
     abstract SimpleFunction<PubsubMessage, T> getParseFn();
 
+    @Nullable
+    abstract Schema getBeamSchema();
+
+    @Nullable
+    abstract SerializableFunction<T, Row> getToRowFn();
+
+    @Nullable
+    abstract SerializableFunction<Row, T> getFromRowFn();
+
+    abstract PubsubClient.PubsubClientFactory getPubsubClientFactory();
+
+    @Nullable
+    abstract Clock getClock();
+
     abstract boolean getNeedsAttributes();
 
     abstract Builder<T> toBuilder();
@@ -571,8 +659,18 @@ public String toString() {
 
       abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn);
 
+      abstract Builder<T> setBeamSchema(@Nullable Schema beamSchema);
+
+      abstract Builder<T> setToRowFn(@Nullable SerializableFunction<T, Row> 
toRowFn);
+
+      abstract Builder<T> setFromRowFn(@Nullable SerializableFunction<Row, T> 
fromRowFn);
+
       abstract Builder<T> setNeedsAttributes(boolean needsAttributes);
 
+      abstract Builder<T> 
setPubsubClientFactory(PubsubClient.PubsubClientFactory clientFactory);
+
+      abstract Builder<T> setClock(@Nullable Clock clock);
+
       abstract Read<T> build();
     }
 
@@ -682,6 +780,26 @@ public String toString() {
       return toBuilder().setCoder(coder).setParseFn(parseFn).build();
     }
 
+    @VisibleForTesting
+    /**
+     * Set's the PubsubClientFactory.
+     *
+     * <p>Only for use by unit tests.
+     */
+    Read<T> withClientFactory(PubsubClient.PubsubClientFactory clientFactory) {
+      return toBuilder().setPubsubClientFactory(clientFactory).build();
+    }
+
+    @VisibleForTesting
+    /**
+     * Set's the internal Clock.
+     *
+     * <p>Only for use by unit tests.
+     */
+    Read<T> withClock(Clock clock) {
+      return toBuilder().setClock(clock).build();
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
       if (getTopicProvider() == null && getSubscriptionProvider() == null) {
@@ -705,14 +823,18 @@ public String toString() {
               : NestedValueProvider.of(getSubscriptionProvider(), new 
SubscriptionPathTranslator());
       PubsubUnboundedSource source =
           new PubsubUnboundedSource(
-              FACTORY,
+              getClock(),
+              getPubsubClientFactory(),
               null /* always get project from runtime PipelineOptions */,
               topicPath,
               subscriptionPath,
               getTimestampAttribute(),
               getIdAttribute(),
               getNeedsAttributes());
-      return 
input.apply(source).apply(MapElements.via(getParseFn())).setCoder(getCoder());
+      PCollection<T> read = 
input.apply(source).apply(MapElements.via(getParseFn()));
+      return (getBeamSchema() != null)
+          ? read.setSchema(getBeamSchema(), getToRowFn(), getFromRowFn())
+          : read.setCoder(getCoder());
     }
 
     @Override
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 48a6a7eb957e..7d278a310b6f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -28,22 +28,49 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
+import com.google.api.client.util.Clock;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.AvroGeneratedUser;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
+import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.model.Statement;
+import org.testng.collections.Lists;
 
 /** Tests for PubsubIO Read and Write transforms. */
 @RunWith(JUnit4.class)
@@ -248,4 +275,170 @@ public void testPrimitiveWriteDisplayData() {
         displayData,
         hasItem(hasDisplayItem("topic")));
   }
+
+  static class GenericClass {
+    int intField;
+    String stringField;
+
+    public GenericClass() {}
+
+    public GenericClass(int intField, String stringField) {
+      this.intField = intField;
+      this.stringField = stringField;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("intField", intField)
+          .add("stringField", stringField)
+          .toString();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(intField, stringField);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof GenericClass)) {
+        return false;
+      }
+      GenericClass o = (GenericClass) other;
+      return Objects.equals(intField, o.intField) && 
Objects.equals(stringField, o.stringField);
+    }
+  }
+
+  private transient PipelineOptions options;
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("test-project", 
"testSubscription");
+  private static final Clock CLOCK = (Clock & Serializable) () -> 673L;
+  transient TestPipeline readPipeline;
+
+  private static final String SCHEMA_STRING =
+      "{\"namespace\": \"example.avro\",\n"
+          + " \"type\": \"record\",\n"
+          + " \"name\": \"AvroGeneratedUser\",\n"
+          + " \"fields\": [\n"
+          + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"favorite_number\", \"type\": [\"int\", 
\"null\"]},\n"
+          + "     {\"name\": \"favorite_color\", \"type\": [\"string\", 
\"null\"]}\n"
+          + " ]\n"
+          + "}";
+
+  private static final Schema SCHEMA = new 
Schema.Parser().parse(SCHEMA_STRING);
+
+  @Rule
+  public final transient TestRule setupPipeline =
+      new TestRule() {
+        @Override
+        public Statement apply(final Statement base, final Description 
description) {
+          // We need to set up the temporary folder, and then set up the 
TestPipeline based on the
+          // chosen folder. Unfortunately, since rule evaluation order is 
unspecified and unrelated
+          // to field order, and is separate from construction, that requires 
manually creating this
+          // TestRule.
+          Statement withPipeline =
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  options = TestPipeline.testingPipelineOptions();
+                  options.as(PubsubOptions.class).setProject("test-project");
+                  readPipeline = TestPipeline.fromOptions(options);
+                  readPipeline.apply(base, description).evaluate();
+                }
+              };
+          return withPipeline;
+        }
+      };
+
+  private <T> void setupTestClient(List<T> inputs, Coder<T> coder) {
+    List<IncomingMessage> messages =
+        inputs
+            .stream()
+            .map(
+                t -> {
+                  try {
+                    return CoderUtils.encodeToByteArray(coder, t);
+                  } catch (CoderException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .map(
+                ba ->
+                    new IncomingMessage(
+                        ba,
+                        null,
+                        1234L,
+                        0,
+                        UUID.randomUUID().toString(),
+                        UUID.randomUUID().toString()))
+            .collect(Collectors.toList());
+
+    clientFactory = PubsubTestClient.createFactoryForPull(CLOCK, SUBSCRIPTION, 
60, messages);
+  }
+
+  private PubsubTestClientFactory clientFactory;
+
+  @After
+  public void after() throws IOException {
+    if (clientFactory != null) {
+      clientFactory.close();
+      clientFactory = null;
+    }
+  }
+
+  @Test
+  public void testAvroGenericRecords() {
+    AvroCoder<GenericRecord> coder = AvroCoder.of(GenericRecord.class, SCHEMA);
+    List<GenericRecord> inputs =
+        ImmutableList.of(
+            new AvroGeneratedUser("Bob", 256, null),
+            new AvroGeneratedUser("Alice", 128, null),
+            new AvroGeneratedUser("Ted", null, "white"));
+    setupTestClient(inputs, coder);
+    PCollection<GenericRecord> read =
+        readPipeline.apply(
+            PubsubIO.readAvroGenericRecords(SCHEMA)
+                .fromSubscription(SUBSCRIPTION.getPath())
+                .withClock(CLOCK)
+                .withClientFactory(clientFactory));
+    PAssert.that(read).containsInAnyOrder(inputs);
+    readPipeline.run();
+  }
+
+  @Test
+  public void testAvroPojo() {
+    AvroCoder<GenericClass> coder = AvroCoder.of(GenericClass.class);
+    List<GenericClass> inputs =
+        Lists.newArrayList(new GenericClass(1, "foo"), new GenericClass(2, 
"bar"));
+    setupTestClient(inputs, coder);
+    PCollection<GenericClass> read =
+        readPipeline.apply(
+            PubsubIO.readAvrosWithBeamSchema(GenericClass.class)
+                .fromSubscription(SUBSCRIPTION.getPath())
+                .withClock(CLOCK)
+                .withClientFactory(clientFactory));
+    PAssert.that(read).containsInAnyOrder(inputs);
+    readPipeline.run();
+  }
+
+  @Test
+  public void testAvroSpecificRecord() {
+    AvroCoder<AvroGeneratedUser> coder = AvroCoder.of(AvroGeneratedUser.class);
+    List<AvroGeneratedUser> inputs =
+        ImmutableList.of(
+            new AvroGeneratedUser("Bob", 256, null),
+            new AvroGeneratedUser("Alice", 128, null),
+            new AvroGeneratedUser("Ted", null, "white"));
+    setupTestClient(inputs, coder);
+    PCollection<AvroGeneratedUser> read =
+        readPipeline.apply(
+            PubsubIO.readAvrosWithBeamSchema(AvroGeneratedUser.class)
+                .fromSubscription(SUBSCRIPTION.getPath())
+                .withClock(CLOCK)
+                .withClientFactory(clientFactory));
+    PAssert.that(read).containsInAnyOrder(inputs);
+    readPipeline.run();
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 178275)
    Time Spent: 13.5h  (was: 13h 20m)

> Provide automatic schema registration for AVROs
> -----------------------------------------------
>
>                 Key: BEAM-4454
>                 URL: https://issues.apache.org/jira/browse/BEAM-4454
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> Need to make sure this is a compatible change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to