Removes AvroIO.Read.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1499d256 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1499d256 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1499d256 Branch: refs/heads/master Commit: 1499d256c616e34b4416fa202a45aa256ac88d20 Parents: 0166e19 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Apr 28 18:19:21 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 222 +++++++------------ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 24 +- .../apache/beam/sdk/io/AvroIOTransformTest.java | 18 +- 4 files changed, 108 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index 2a73c28..e3a44d2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -74,7 +74,7 @@ public class AvroPipelineTest { Pipeline p = pipelineRule.createPipeline(); PCollection<GenericRecord> input = p.apply( - AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema)); + AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema)); input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); p.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 75e14d5..abde9cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -63,7 +63,7 @@ import org.apache.beam.sdk.values.PDone; * * // A simple Read of a local file (only runs locally): * PCollection<AvroAutoGenClass> records = - * p.apply(AvroIO.Read.from("/path/to/file.avro") + * p.apply(AvroIO.read().from("/path/to/file.avro") * .withSchema(AvroAutoGenClass.class)); * * // A Read from a GCS file (runs locally and using remote execution): @@ -125,15 +125,39 @@ import org.apache.beam.sdk.values.PDone; */ public class AvroIO { /** - * A root {@link PTransform} that reads from an Avro file (or multiple Avro - * files matching a pattern) and returns a {@link PCollection} containing - * the decoding of each record. + * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern). + * + * <p>The schema must be specified using one of the {@code withSchema} functions. */ - public static class Read { + public static <T> Read<T> read() { + return new Read<>(); + } + + /** Implementation of {@link #read}. */ + public static class Read<T> extends PTransform<PBegin, PCollection<T>> { + /** The filepattern to read from. */ + @Nullable + final String filepattern; + /** The class type of the records. */ + @Nullable + final Class<T> type; + /** The schema of the input file. */ + @Nullable + final Schema schema; + + Read() { + this(null, null, null, null); + } + + Read(String name, String filepattern, Class<T> type, Schema schema) { + super(name); + this.filepattern = filepattern; + this.type = type; + this.schema = schema; + } /** - * Returns a {@link PTransform} that reads from the file(s) - * with the given name or pattern. This can be a local filename + * Reads from the file(s) with the given name or pattern. This can be a local filename * or filename pattern (if running locally), or a Google Cloud * Storage filename or filename pattern of the form * {@code "gs://<bucket>/<filepath>"} (if running locally or @@ -141,162 +165,82 @@ public class AvroIO { * <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html">Java * Filesystem glob patterns</a> ("*", "?", "[..]") are supported. */ - public static Bound<GenericRecord> from(String filepattern) { - return new Bound<>(GenericRecord.class).from(filepattern); + public Read<T> from(String filepattern) { + return new Read<>(name, filepattern, type, schema); } /** - * Returns a {@link PTransform} that reads Avro file(s) - * containing records whose type is the specified Avro-generated class. - * - * @param <T> the type of the decoded elements, and the elements - * of the resulting {@link PCollection} + * Returns a new {@link PTransform} that's like this one but + * that reads Avro file(s) containing records whose type is the + * specified Avro-generated class. */ - public static <T> Bound<T> withSchema(Class<T> type) { - return new Bound<>(type).withSchema(type); + public Read<T> withSchema(Class<T> type) { + return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type)); } /** - * Returns a {@link PTransform} that reads Avro file(s) - * containing records of the specified schema. + * Returns a new {@link PTransform} that's like this one but + * that reads Avro file(s) containing records of the specified schema. */ - public static Bound<GenericRecord> withSchema(Schema schema) { - return new Bound<>(GenericRecord.class).withSchema(schema); + public Read<GenericRecord> withSchema(Schema schema) { + return new Read<>(name, filepattern, GenericRecord.class, schema); } /** - * Returns a {@link PTransform} that reads Avro file(s) - * containing records of the specified schema in a JSON-encoded - * string form. + * Returns a new {@link PTransform} that's like this one but + * that reads Avro file(s) containing records of the specified schema + * in a JSON-encoded string form. + * + * <p>Does not modify this object. */ - public static Bound<GenericRecord> withSchema(String schema) { + public Read<GenericRecord> withSchema(String schema) { return withSchema((new Schema.Parser()).parse(schema)); } - /** - * A {@link PTransform} that reads from an Avro file (or multiple Avro - * files matching a pattern) and returns a bounded {@link PCollection} containing - * the decoding of each record. - * - * @param <T> the type of each of the elements of the resulting - * PCollection - */ - public static class Bound<T> extends PTransform<PBegin, PCollection<T>> { - /** The filepattern to read from. */ - @Nullable - final String filepattern; - /** The class type of the records. */ - final Class<T> type; - /** The schema of the input file. */ - @Nullable - final Schema schema; - - Bound(Class<T> type) { - this(null, null, type, null); + @Override + public PCollection<T> expand(PBegin input) { + if (filepattern == null) { + throw new IllegalStateException( + "need to set the filepattern of an AvroIO.Read transform"); } - - Bound(String name, String filepattern, Class<T> type, Schema schema) { - super(name); - this.filepattern = filepattern; - this.type = type; - this.schema = schema; + if (schema == null) { + throw new IllegalStateException("need to set the schema of an AvroIO.Read transform"); } - /** - * Returns a new {@link PTransform} that's like this one but - * that reads from the file(s) with the given name or pattern. - * (See {@link AvroIO.Read#from} for a description of - * filepatterns.) - * - * <p>Does not modify this object. - */ - public Bound<T> from(String filepattern) { - return new Bound<>(name, filepattern, type, schema); - } + @SuppressWarnings("unchecked") + Bounded<T> read = + type == GenericRecord.class + ? (Bounded<T>) org.apache.beam.sdk.io.Read.from( + AvroSource.from(filepattern).withSchema(schema)) + : org.apache.beam.sdk.io.Read.from( + AvroSource.from(filepattern).withSchema(type)); - /** - * Returns a new {@link PTransform} that's like this one but - * that reads Avro file(s) containing records whose type is the - * specified Avro-generated class. - * - * <p>Does not modify this object. - * - * @param <X> the type of the decoded elements and the elements of - * the resulting PCollection - */ - public <X> Bound<X> withSchema(Class<X> type) { - return new Bound<>(name, filepattern, type, ReflectData.get().getSchema(type)); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that reads Avro file(s) containing records of the specified schema. - * - * <p>Does not modify this object. - */ - public Bound<GenericRecord> withSchema(Schema schema) { - return new Bound<>(name, filepattern, GenericRecord.class, schema); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that reads Avro file(s) containing records of the specified schema - * in a JSON-encoded string form. - * - * <p>Does not modify this object. - */ - public Bound<GenericRecord> withSchema(String schema) { - return withSchema((new Schema.Parser()).parse(schema)); - } - - @Override - public PCollection<T> expand(PBegin input) { - if (filepattern == null) { - throw new IllegalStateException( - "need to set the filepattern of an AvroIO.Read transform"); - } - if (schema == null) { - throw new IllegalStateException("need to set the schema of an AvroIO.Read transform"); - } - - @SuppressWarnings("unchecked") - Bounded<T> read = - type == GenericRecord.class - ? (Bounded<T>) org.apache.beam.sdk.io.Read.from( - AvroSource.from(filepattern).withSchema(schema)) - : org.apache.beam.sdk.io.Read.from( - AvroSource.from(filepattern).withSchema(type)); - - PCollection<T> pcol = input.getPipeline().apply("Read", read); - // Honor the default output coder that would have been used by this PTransform. - pcol.setCoder(getDefaultOutputCoder()); - return pcol; - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("filePattern", filepattern) - .withLabel("Input File Pattern")); - } + PCollection<T> pcol = input.getPipeline().apply("Read", read); + // Honor the default output coder that would have been used by this PTransform. + pcol.setCoder(getDefaultOutputCoder()); + return pcol; + } - @Override - protected Coder<T> getDefaultOutputCoder() { - return AvroCoder.of(type, schema); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("filePattern", filepattern) + .withLabel("Input File Pattern")); + } - public String getFilepattern() { - return filepattern; - } + @Override + protected Coder<T> getDefaultOutputCoder() { + return AvroCoder.of(type, schema); + } - public Schema getSchema() { - return schema; - } + public String getFilepattern() { + return filepattern; } - /** Disallow construction of utility class. */ - private Read() {} + public Schema getSchema() { + return schema; + } } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index ece7997..6d842b3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -103,7 +103,7 @@ public class AvroIOTest { @Test public void testAvroIOGetName() { - assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName()); + assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName()); assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName()); } @@ -150,8 +150,11 @@ public class AvroIOTest { .withSchema(GenericClass.class)); p.run(); - PCollection<GenericClass> input = p - .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class)); + PCollection<GenericClass> input = + p.apply( + AvroIO.<GenericClass>read() + .from(outputFile.getAbsolutePath()) + .withSchema(GenericClass.class)); PAssert.that(input).containsInAnyOrder(values); p.run(); @@ -173,7 +176,7 @@ public class AvroIOTest { p.run(); PCollection<GenericClass> input = p - .apply(AvroIO.Read + .apply(AvroIO.<GenericClass>read() .from(outputFile.getAbsolutePath()) .withSchema(GenericClass.class)); @@ -200,7 +203,7 @@ public class AvroIOTest { p.run(); PCollection<GenericClass> input = p - .apply(AvroIO.Read + .apply(AvroIO.<GenericClass>read() .from(outputFile.getAbsolutePath()) .withSchema(GenericClass.class)); @@ -269,8 +272,11 @@ public class AvroIOTest { List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null)); - PCollection<GenericClassV2> input = p - .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class)); + PCollection<GenericClassV2> input = + p.apply( + AvroIO.<GenericClassV2>read() + .from(outputFile.getAbsolutePath()) + .withSchema(GenericClassV2.class)); PAssert.that(input).containsInAnyOrder(expected); p.run(); @@ -533,7 +539,7 @@ public class AvroIOTest { @Test public void testReadDisplayData() { - AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*"); + AvroIO.Read<?> read = AvroIO.read().from("foo.*"); DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); @@ -544,7 +550,7 @@ public class AvroIOTest { public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*") + AvroIO.Read<?> read = AvroIO.read().from("foo.*") .withSchema(Schema.create(Schema.Type.STRING)); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index 3cf52a4..06b9841 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -138,13 +138,13 @@ public class AvroIOTransformTest { } private <T> void runTestRead(@Nullable final String applyName, - final AvroIO.Read.Bound<T> readBuilder, + final AvroIO.Read<T> readBuilder, final String expectedName, final T[] expectedOutput) throws Exception { final File avroFile = tmpFolder.newFile("file.avro"); generateAvroFile(generateAvroObjects(), avroFile); - final AvroIO.Read.Bound<T> read = readBuilder.from(avroFile.getPath()); + final AvroIO.Read<T> read = readBuilder.from(avroFile.getPath()); final PCollection<T> output = applyName == null ? pipeline.apply(read) : pipeline.apply(applyName, read); @@ -169,14 +169,14 @@ public class AvroIOTransformTest { // test read using generated class new Object[] { null, - AvroIO.Read.withSchema(AvroGeneratedUser.class), + AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class), "AvroIO.Read/Read.out", generateAvroObjects(), generatedClass }, new Object[] { "MyRead", - AvroIO.Read.withSchema(AvroGeneratedUser.class), + AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class), "MyRead/Read.out", generateAvroObjects(), generatedClass @@ -185,14 +185,14 @@ public class AvroIOTransformTest { // test read using schema object new Object[] { null, - AvroIO.Read.withSchema(SCHEMA), + AvroIO.read().withSchema(SCHEMA), "AvroIO.Read/Read.out", generateAvroGenericRecords(), fromSchema }, new Object[] { "MyRead", - AvroIO.Read.withSchema(SCHEMA), + AvroIO.read().withSchema(SCHEMA), "MyRead/Read.out", generateAvroGenericRecords(), fromSchema @@ -201,14 +201,14 @@ public class AvroIOTransformTest { // test read using schema string new Object[] { null, - AvroIO.Read.withSchema(SCHEMA_STRING), + AvroIO.read().withSchema(SCHEMA_STRING), "AvroIO.Read/Read.out", generateAvroGenericRecords(), fromSchemaString }, new Object[] { "MyRead", - AvroIO.Read.withSchema(SCHEMA_STRING), + AvroIO.read().withSchema(SCHEMA_STRING), "MyRead/Read.out", generateAvroGenericRecords(), fromSchemaString @@ -221,7 +221,7 @@ public class AvroIOTransformTest { public String transformName; @Parameterized.Parameter(1) - public AvroIO.Read.Bound readTransform; + public AvroIO.Read readTransform; @Parameterized.Parameter(2) public String expectedReadTransformName;