[ 
https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=178031&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-178031
 ]

ASF GitHub Bot logged work on BEAM-4454:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Dec/18 15:57
            Start Date: 21/Dec/18 15:57
    Worklog Time Spent: 10m 
      Work Description: kanterov commented on a change in pull request #7290: 
[[BEAM-4454] Support avro schema inference in sources
URL: https://github.com/apache/beam/pull/7290#discussion_r243620720
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
 ##########
 @@ -105,1138 +106,1187 @@
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
 /** Tests for AvroIO Read and Write transforms. */
-@RunWith(JUnit4.class)
 public class AvroIOTest implements Serializable {
-
-  @Rule public transient TestPipeline writePipeline = TestPipeline.create();
-
-  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
-
-  @Rule public transient TestPipeline windowedAvroWritePipeline = 
TestPipeline.create();
-
-  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Rule public transient ExpectedException expectedException = 
ExpectedException.none();
-
-  @Test
-  public void testAvroIOGetName() {
-    assertEquals("AvroIO.Read", 
AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
-    assertEquals("AvroIO.Write", 
AvroIO.write(String.class).to("/tmp/foo/baz").getName());
-  }
-
-  @DefaultCoder(AvroCoder.class)
-  static class GenericClass {
-    int intField;
-    String stringField;
-
-    public GenericClass() {}
-
-    public GenericClass(int intField, String stringField) {
-      this.intField = intField;
-      this.stringField = stringField;
+  /** Unit tests. */
+  @RunWith(JUnit4.class)
+  public static class SimpleTests implements Serializable {
+    @Test
+    public void testAvroIOGetName() {
+      assertEquals("AvroIO.Read", 
AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
+      assertEquals("AvroIO.Write", 
AvroIO.write(String.class).to("/tmp/foo/baz").getName());
     }
 
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("intField", intField)
-          .add("stringField", stringField)
-          .toString();
+    @Test
+    public void testWriteWithDefaultCodec() throws Exception {
+      AvroIO.Write<String> write = 
AvroIO.write(String.class).to("/tmp/foo/baz");
+      assertEquals(CodecFactory.snappyCodec().toString(), 
write.inner.getCodec().toString());
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(intField, stringField);
+    @Test
+    public void testWriteWithCustomCodec() throws Exception {
+      AvroIO.Write<String> write =
+          
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
+      assertEquals(SNAPPY_CODEC, write.inner.getCodec().toString());
     }
 
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof GenericClass)) {
-        return false;
-      }
-      GenericClass o = (GenericClass) other;
-      return Objects.equals(intField, o.intField) && 
Objects.equals(stringField, o.stringField);
-    }
-  }
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
+      AvroIO.Write<String> write =
+          
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.deflateCodec(9));
 
-  private static class ParseGenericClass
-      implements SerializableFunction<GenericRecord, GenericClass> {
-    @Override
-    public GenericClass apply(GenericRecord input) {
-      return new GenericClass((int) input.get("intField"), 
input.get("stringField").toString());
+      assertEquals(
+          CodecFactory.deflateCodec(9).toString(),
+          
SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
     }
-  }
 
-  private enum Sharding {
-    RUNNER_DETERMINED,
-    WITHOUT_SHARDING,
-    FIXED_3_SHARDS
-  }
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWriteWithSerDeCustomXZCodec() throws Exception {
+      AvroIO.Write<String> write =
+          
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.xzCodec(9));
 
-  private enum WriteMethod {
-    AVROIO_WRITE,
-    AVROIO_SINK
-  }
+      assertEquals(
+          CodecFactory.xzCodec(9).toString(),
+          
SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
+    }
 
-  private static final String SCHEMA_STRING =
-      "{\"namespace\": \"example.avro\",\n"
-          + " \"type\": \"record\",\n"
-          + " \"name\": \"AvroGeneratedUser\",\n"
-          + " \"fields\": [\n"
-          + "     {\"name\": \"name\", \"type\": \"string\"},\n"
-          + "     {\"name\": \"favorite_number\", \"type\": [\"int\", 
\"null\"]},\n"
-          + "     {\"name\": \"favorite_color\", \"type\": [\"string\", 
\"null\"]}\n"
-          + " ]\n"
-          + "}";
+    @Test
+    public void testReadDisplayData() {
+      AvroIO.Read<String> read = AvroIO.read(String.class).from("/foo.*");
 
-  private static final Schema SCHEMA = new 
Schema.Parser().parse(SCHEMA_STRING);
+      DisplayData displayData = DisplayData.from(read);
+      assertThat(displayData, hasDisplayItem("filePattern", "/foo.*"));
+    }
+  }
 
-  @Test
+  /** NeedsRunner tests. */
+  @RunWith(Parameterized.class)
   @Category(NeedsRunner.class)
-  public void testWriteThenReadJavaClass() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            AvroIO.write(GenericClass.class)
-                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
-                .withoutSharding());
-    writePipeline.run();
+  public static class NeedsRunnerTests implements Serializable {
+    @Rule public transient TestPipeline writePipeline = TestPipeline.create();
 
-    PAssert.that(
-            readPipeline.apply(
-                "Read",
-                AvroIO.read(GenericClass.class)
-                    
.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
-        .containsInAnyOrder(values);
+    @Rule public transient TestPipeline readPipeline = TestPipeline.create();
 
-    readPipeline.run();
-  }
+    @Rule public transient TestPipeline windowedAvroWritePipeline = 
TestPipeline.create();
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadCustomType() throws Throwable {
-    List<Long> values = Arrays.asList(0L, 1L, 2L);
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            AvroIO.<Long, GenericClass>writeCustomType()
-                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
-                .withFormatFunction(new CreateGenericClass())
-                .withSchema(ReflectData.get().getSchema(GenericClass.class))
-                .withoutSharding());
-    writePipeline.run();
-
-    PAssert.that(
-            readPipeline
-                .apply(
-                    "Read",
-                    AvroIO.read(GenericClass.class)
-                        
.from(readPipeline.newProvider(outputFile.getAbsolutePath())))
-                .apply(
-                    MapElements.via(
-                        new SimpleFunction<GenericClass, Long>() {
-                          @Override
-                          public Long apply(GenericClass input) {
-                            return (long) input.intField;
-                          }
-                        })))
-        .containsInAnyOrder(values);
-
-    readPipeline.run();
-  }
+    @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
 
-  private <T extends GenericRecord> void testWriteThenReadGeneratedClass(
-      AvroIO.Write<T> writeTransform, AvroIO.Read<T> readTransform) throws 
Exception {
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    List<T> values =
-        ImmutableList.of(
-            (T) new AvroGeneratedUser("Bob", 256, null),
-            (T) new AvroGeneratedUser("Alice", 128, null),
-            (T) new AvroGeneratedUser("Ted", null, "white"));
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            writeTransform
-                .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
-                .withoutSharding());
-    writePipeline.run();
-
-    PAssert.that(
-            readPipeline.apply(
-                "Read", 
readTransform.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
-        .containsInAnyOrder(values);
+    @Rule public transient ExpectedException expectedException = 
ExpectedException.none();
 
-    readPipeline.run();
-  }
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<Object[]> params() {
+      return Arrays.asList(new Object[][] {{true}, {false}});
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadGeneratedClassWithClass() throws Throwable {
-    testWriteThenReadGeneratedClass(
-        AvroIO.write(AvroGeneratedUser.class), 
AvroIO.read(AvroGeneratedUser.class));
-  }
+    @Parameterized.Parameter public boolean withBeamSchemas;
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadGeneratedClassWithSchema() throws Throwable {
-    testWriteThenReadGeneratedClass(
-        AvroIO.writeGenericRecords(SCHEMA), AvroIO.readGenericRecords(SCHEMA));
-  }
+    @DefaultCoder(AvroCoder.class)
+    static class GenericClass {
+      int intField;
+      String stringField;
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadGeneratedClassWithSchemaString() throws 
Throwable {
-    testWriteThenReadGeneratedClass(
-        AvroIO.writeGenericRecords(SCHEMA.toString()),
-        AvroIO.readGenericRecords(SCHEMA.toString()));
-  }
+      public GenericClass() {}
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteSingleFileThenReadUsingAllMethods() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
-    writePipeline.run();
-
-    // Test the same data using all versions of read().
-    PCollection<String> path =
-        readPipeline.apply("Create path", 
Create.of(outputFile.getAbsolutePath()));
-    PAssert.that(
-            readPipeline.apply(
-                "Read", 
AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            readPipeline.apply(
-                "Read withHintMatchesManyFiles",
-                AvroIO.read(GenericClass.class)
-                    .from(outputFile.getAbsolutePath())
-                    .withHintMatchesManyFiles()))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            path.apply(
-                "ReadAll", 
AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            readPipeline.apply(
-                "Parse",
-                AvroIO.parseGenericRecords(new ParseGenericClass())
-                    .from(outputFile.getAbsolutePath())
-                    .withCoder(AvroCoder.of(GenericClass.class))))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            readPipeline.apply(
-                "Parse withHintMatchesManyFiles",
-                AvroIO.parseGenericRecords(new ParseGenericClass())
-                    .from(outputFile.getAbsolutePath())
-                    .withCoder(AvroCoder.of(GenericClass.class))
-                    .withHintMatchesManyFiles()))
-        .containsInAnyOrder(values);
-    PAssert.that(
-            path.apply(
-                "ParseAll",
-                AvroIO.parseAllGenericRecords(new ParseGenericClass())
-                    .withCoder(AvroCoder.of(GenericClass.class))
-                    .withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(values);
-
-    readPipeline.run();
-  }
+      public GenericClass(int intField, String stringField) {
+        this.intField = intField;
+        this.stringField = stringField;
+      }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadMultipleFilepatterns() throws Throwable {
-    List<GenericClass> firstValues = Lists.newArrayList();
-    List<GenericClass> secondValues = Lists.newArrayList();
-    for (int i = 0; i < 10; ++i) {
-      firstValues.add(new GenericClass(i, "a" + i));
-      secondValues.add(new GenericClass(i, "b" + i));
-    }
-    writePipeline
-        .apply("Create first", Create.of(firstValues))
-        .apply(
-            "Write first",
-            AvroIO.write(GenericClass.class)
-                .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
-                .withNumShards(2));
-    writePipeline
-        .apply("Create second", Create.of(secondValues))
-        .apply(
-            "Write second",
-            AvroIO.write(GenericClass.class)
-                .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
-                .withNumShards(3));
-    writePipeline.run();
-
-    // Test readAll() and parseAllGenericRecords().
-    PCollection<String> paths =
-        readPipeline.apply(
-            "Create paths",
-            Create.of(
-                tmpFolder.getRoot().getAbsolutePath() + "/first*",
-                tmpFolder.getRoot().getAbsolutePath() + "/second*"));
-    PAssert.that(
-            paths.apply(
-                "Read all", 
AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-    PAssert.that(
-            paths.apply(
-                "Parse all",
-                AvroIO.parseAllGenericRecords(new ParseGenericClass())
-                    .withCoder(AvroCoder.of(GenericClass.class))
-                    .withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-
-    readPipeline.run();
-  }
+      @Override
+      public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+            .add("intField", intField)
+            .add("stringField", stringField)
+            .toString();
+      }
 
-  private static class CreateGenericClass extends SimpleFunction<Long, 
GenericClass> {
-    @Override
-    public GenericClass apply(Long i) {
-      return new GenericClass(i.intValue(), "value" + i);
-    }
-  }
+      @Override
+      public int hashCode() {
+        return Objects.hash(intField, stringField);
+      }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testContinuouslyWriteAndReadMultipleFilepatterns() throws 
Throwable {
-    SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass();
-    List<GenericClass> firstValues = Lists.newArrayList();
-    List<GenericClass> secondValues = Lists.newArrayList();
-    for (int i = 0; i < 7; ++i) {
-      (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i));
+      @Override
+      public boolean equals(Object other) {
+        if (other == null || !(other instanceof GenericClass)) {
+          return false;
+        }
+        GenericClass o = (GenericClass) other;
+        return Objects.equals(intField, o.intField) && 
Objects.equals(stringField, o.stringField);
+      }
     }
-    // Configure windowing of the input so that it fires every time a new 
element is generated,
-    // so that files are written continuously.
-    Window<Long> window =
-        Window.<Long>into(FixedWindows.of(Duration.millis(100)))
-            .withAllowedLateness(Duration.ZERO)
-            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
-            .discardingFiredPanes();
-    readPipeline
-        .apply("Sequence first", GenerateSequence.from(0).to(3).withRate(1, 
Duration.millis(300)))
-        .apply("Window first", window)
-        .apply("Map first", MapElements.via(mapFn))
-        .apply(
-            "Write first",
-            AvroIO.write(GenericClass.class)
-                .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
-                .withNumShards(2)
-                .withWindowedWrites());
-    readPipeline
-        .apply("Sequence second", GenerateSequence.from(3).to(7).withRate(1, 
Duration.millis(300)))
-        .apply("Window second", window)
-        .apply("Map second", MapElements.via(mapFn))
-        .apply(
-            "Write second",
-            AvroIO.write(GenericClass.class)
-                .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
-                .withNumShards(3)
-                .withWindowedWrites());
 
-    // Test read(), readAll(), parse(), and parseAllGenericRecords() with 
watchForNewFiles().
-    PAssert.that(
-            readPipeline.apply(
-                "Read",
-                AvroIO.read(GenericClass.class)
-                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
-                    .watchForNewFiles(
-                        Duration.millis(100),
-                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
-        .containsInAnyOrder(firstValues);
-    PAssert.that(
-            readPipeline.apply(
-                "Parse",
-                AvroIO.parseGenericRecords(new ParseGenericClass())
-                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
-                    .watchForNewFiles(
-                        Duration.millis(100),
-                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
-        .containsInAnyOrder(firstValues);
-
-    PCollection<String> paths =
-        readPipeline.apply(
-            "Create paths",
-            Create.of(
-                tmpFolder.getRoot().getAbsolutePath() + "/first*",
-                tmpFolder.getRoot().getAbsolutePath() + "/second*"));
-    PAssert.that(
-            paths.apply(
-                "Read all",
-                AvroIO.readAll(GenericClass.class)
-                    .watchForNewFiles(
-                        Duration.millis(100),
-                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))
-                    .withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-    PAssert.that(
-            paths.apply(
-                "Parse all",
-                AvroIO.parseAllGenericRecords(new ParseGenericClass())
-                    .withCoder(AvroCoder.of(GenericClass.class))
-                    .watchForNewFiles(
-                        Duration.millis(100),
-                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))
-                    .withDesiredBundleSizeBytes(10)))
-        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
-    readPipeline.run();
-  }
+    private static class ParseGenericClass
+        implements SerializableFunction<GenericRecord, GenericClass> {
+      @Override
+      public GenericClass apply(GenericRecord input) {
+        return new GenericClass((int) input.get("intField"), 
input.get("stringField").toString());
+      }
 
-  @Test
-  @SuppressWarnings("unchecked")
-  @Category(NeedsRunner.class)
-  public void testCompressedWriteAndReadASingleFile() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
+      @Test
+      public void testWriteDisplayData() {
+        AvroIO.Write<GenericClass> write =
             AvroIO.write(GenericClass.class)
-                .to(outputFile.getAbsolutePath())
-                .withoutSharding()
-                .withCodec(CodecFactory.deflateCodec(9)));
-    writePipeline.run();
-
-    PAssert.that(
-            
readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
-        .containsInAnyOrder(values);
-    readPipeline.run();
-
-    try (DataFileStream dataFileStream =
-        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
-      assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
+                .to("/foo")
+                .withShardNameTemplate("-SS-of-NN-")
+                .withSuffix("bar")
+                .withNumShards(100)
+                .withCodec(CodecFactory.deflateCodec(6));
+
+        DisplayData displayData = DisplayData.from(write);
+
+        assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
+        assertThat(displayData, hasDisplayItem("shardNameTemplate", 
"-SS-of-NN-"));
+        assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+        assertThat(
+            displayData,
+            hasDisplayItem(
+                "schema",
+                
"{\"type\":\"record\",\"name\":\"GenericClass\",\"namespace\":\"org.apache.beam.sdk.io"
+                    + 
".AvroIOTest$\",\"fields\":[{\"name\":\"intField\",\"type\":\"int\"},"
+                    + "{\"name\":\"stringField\",\"type\":\"string\"}]}"));
+        assertThat(displayData, hasDisplayItem("numShards", 100));
+        assertThat(displayData, hasDisplayItem("codec", 
CodecFactory.deflateCodec(6).toString()));
+      }
     }
-  }
 
-  @Test
-  @SuppressWarnings("unchecked")
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadASingleFileWithNullCodec() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        .apply(
-            AvroIO.write(GenericClass.class)
-                .to(outputFile.getAbsolutePath())
-                .withoutSharding()
-                .withCodec(CodecFactory.nullCodec()));
-    writePipeline.run();
-
-    PAssert.that(
-            
readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
-        .containsInAnyOrder(values);
-    readPipeline.run();
-
-    try (DataFileStream dataFileStream =
-        new DataFileStream(new FileInputStream(outputFile), new 
GenericDatumReader())) {
-      assertEquals("null", dataFileStream.getMetaString("avro.codec"));
+    private enum Sharding {
+      RUNNER_DETERMINED,
+      WITHOUT_SHARDING,
+      FIXED_3_SHARDS
     }
-  }
-
-  @DefaultCoder(AvroCoder.class)
-  static class GenericClassV2 {
-    int intField;
-    String stringField;
-    @Nullable String nullableField;
 
-    public GenericClassV2() {}
-
-    public GenericClassV2(int intValue, String stringValue, String 
nullableValue) {
-      this.intField = intValue;
-      this.stringField = stringValue;
-      this.nullableField = nullableValue;
+    private enum WriteMethod {
+      AVROIO_WRITE,
+      AVROIO_SINK
     }
 
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("intField", intField)
-          .add("stringField", stringField)
-          .add("nullableField", nullableField)
-          .toString();
+    private static final String SCHEMA_STRING =
+        "{\"namespace\": \"example.avro\",\n"
+            + " \"type\": \"record\",\n"
+            + " \"name\": \"AvroGeneratedUser\",\n"
+            + " \"fields\": [\n"
+            + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+            + "     {\"name\": \"favorite_number\", \"type\": [\"int\", 
\"null\"]},\n"
+            + "     {\"name\": \"favorite_color\", \"type\": [\"string\", 
\"null\"]}\n"
+            + " ]\n"
+            + "}";
+
+    private static final Schema SCHEMA = new 
Schema.Parser().parse(SCHEMA_STRING);
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadJavaClass() throws Throwable {
+      List<GenericClass> values =
+          ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              AvroIO.write(GenericClass.class)
+                  .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                  .withoutSharding());
+      writePipeline.run();
+
+      PAssert.that(
+              readPipeline.apply(
+                  "Read",
+                  AvroIO.read(GenericClass.class)
+                      .withBeamSchemas(withBeamSchemas)
+                      
.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
+          .containsInAnyOrder(values);
+
+      readPipeline.run();
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(intField, stringField, nullableField);
+    @Test
+    @Category(NeedsRunner.class)
+    public void testWriteThenReadCustomType() throws Throwable {
+      List<Long> values = Arrays.asList(0L, 1L, 2L);
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              AvroIO.<Long, GenericClass>writeCustomType()
+                  .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                  .withFormatFunction(new CreateGenericClass())
+                  .withSchema(ReflectData.get().getSchema(GenericClass.class))
+                  .withoutSharding());
+      writePipeline.run();
+
+      PAssert.that(
+              readPipeline
+                  .apply(
+                      "Read",
+                      AvroIO.read(GenericClass.class)
+                          .withBeamSchemas(withBeamSchemas)
+                          
.from(readPipeline.newProvider(outputFile.getAbsolutePath())))
+                  .apply(
+                      MapElements.via(
+                          new SimpleFunction<GenericClass, Long>() {
+                            @Override
+                            public Long apply(GenericClass input) {
+                              return (long) input.intField;
+                            }
+                          })))
+          .containsInAnyOrder(values);
+
+      readPipeline.run();
     }
 
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof GenericClassV2)) {
-        return false;
-      }
-      GenericClassV2 o = (GenericClassV2) other;
-      return Objects.equals(intField, o.intField)
-          && Objects.equals(stringField, o.stringField)
-          && Objects.equals(nullableField, o.nullableField);
+    private <T extends GenericRecord> void testWriteThenReadGeneratedClass(
+        AvroIO.Write<T> writeTransform, AvroIO.Read<T> readTransform) throws 
Exception {
+      File outputFile = tmpFolder.newFile("output.avro");
+
+      List<T> values =
+          ImmutableList.of(
+              (T) new AvroGeneratedUser("Bob", 256, null),
+              (T) new AvroGeneratedUser("Alice", 128, null),
+              (T) new AvroGeneratedUser("Ted", null, "white"));
+
+      writePipeline
+          .apply(Create.of(values))
+          .apply(
+              writeTransform
+                  .to(writePipeline.newProvider(outputFile.getAbsolutePath()))
+                  .withoutSharding());
+      writePipeline.run();
+
+      PAssert.that(
+              readPipeline.apply(
+                  "Read",
+                  
readTransform.from(readPipeline.newProvider(outputFile.getAbsolutePath()))))
+          .containsInAnyOrder(values);
+
+      readPipeline.run();
     }
-  }
-
-  /**
-   * Tests that {@code AvroIO} can read an upgraded version of an old class, 
as long as the schema
-   * resolution process succeeds. This test covers the case when a new, {@code 
@Nullable} field has
-   * been added.
-   *
-   * <p>For more information, see 
http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteThenReadSchemaUpgrade() throws Throwable {
-    List<GenericClass> values =
-        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, 
"bar"));
-    File outputFile = tmpFolder.newFile("output.avro");
-
-    writePipeline
-        .apply(Create.of(values))
-        
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
-    writePipeline.run();
 
-    List<GenericClassV2> expected =
-        ImmutableList.of(new GenericClassV2(3, "hi", null), new 
GenericClassV2(5, "bar", null));
-
-    PAssert.that(
-            readPipeline.apply(
-                
AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath())))
-        .containsInAnyOrder(expected);
-    readPipeline.run();
-  }
-
-  private static class WindowedFilenamePolicy extends FilenamePolicy {
-    final ResourceId outputFilePrefix;
-
-    WindowedFilenamePolicy(ResourceId outputFilePrefix) {
-      this.outputFilePrefix = outputFilePrefix;
+    @Test
+    //@Category(NeedsRunner.class)
 
 Review comment:
   It is intentional?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 178031)
    Time Spent: 12h 10m  (was: 12h)

> Provide automatic schema registration for AVROs
> -----------------------------------------------
>
>                 Key: BEAM-4454
>                 URL: https://issues.apache.org/jira/browse/BEAM-4454
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 12h 10m
>  Remaining Estimate: 0h
>
> Need to make sure this is a compatible change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to