Converts AvroIO.Write to AutoValue; adds writeGenericRecords()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0d74750 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0d74750 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0d74750 Branch: refs/heads/master Commit: e0d74750da73658a067e7522f18c23c5e622fb2f Parents: abb4916 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Apr 28 19:21:15 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 355 +++++-------------- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 19 +- .../apache/beam/sdk/io/AvroIOTransformTest.java | 4 +- 4 files changed, 105 insertions(+), 275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index c58d81e..7188dc5 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -75,7 +75,7 @@ public class AvroPipelineTest { Pipeline p = pipelineRule.createPipeline(); PCollection<GenericRecord> input = p.apply( AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath())); - input.apply(AvroIO.write().to(outputDir.getAbsolutePath()).withSchema(schema)); + input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath())); p.run().waitUntilFinish(); List<GenericRecord> records = readGenericFile(); http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 08fc8a9..8cdd4e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -154,7 +154,30 @@ public class AvroIO { * pattern). */ public static <T> Write<T> write() { - return new Write<>(null); + return new AutoValue_AvroIO_Write.Builder<T>() + .setFilenameSuffix("") + .setNumShards(0) + .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE) + .setCodec(Write.DEFAULT_CODEC) + .setMetadata(ImmutableMap.<String, Object>of()) + .setWindowedWrites(false) + .build(); + } + + /** Writes Avro records of the specified schema. */ + public static Write<GenericRecord> writeGenericRecords(Schema schema) { + return AvroIO.<GenericRecord>write() + .toBuilder() + .setRecordClass(GenericRecord.class) + .setSchema(schema) + .build(); + } + + /** + * Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string. + */ + public static Write<GenericRecord> writeGenericRecords(String schema) { + return writeGenericRecords(new Schema.Parser().parse(schema)); } /** Implementation of {@link #read}. */ @@ -229,7 +252,8 @@ public class AvroIO { ///////////////////////////////////////////////////////////////////////////// /** Implementation of {@link #write}. */ - public static class Write<T> extends PTransform<PCollection<T>, PDone> { + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { /** * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or * multiple Avro files matching a sharding pattern). @@ -242,80 +266,38 @@ public class AvroIO { // This should be a multiple of 4 to not get a partial encoded byte. private static final int METADATA_BYTES_MAX_LENGTH = 40; - /** The filename to write to. */ - @Nullable - final String filenamePrefix; - /** Suffix to use for each filename. */ - final String filenameSuffix; - /** Requested number of shards. 0 for automatic. */ - final int numShards; - /** Shard template string. */ - final String shardTemplate; - /** The class type of the records. */ - final Class<T> type; - /** The schema of the output file. */ - @Nullable - final Schema schema; - final boolean windowedWrites; - FileBasedSink.FilenamePolicy filenamePolicy; - + @Nullable abstract String getFilenamePrefix(); + abstract String getFilenameSuffix(); + abstract int getNumShards(); + abstract String getShardTemplate(); + abstract Class<T> getRecordClass(); + @Nullable abstract Schema getSchema(); + abstract boolean getWindowedWrites(); + @Nullable abstract FileBasedSink.FilenamePolicy getFilenamePolicy(); /** * The codec used to encode the blocks in the Avro file. String value drawn from those in * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html */ - final SerializableAvroCodecFactory codec; + abstract SerializableAvroCodecFactory getCodec(); /** Avro file metadata. */ - final ImmutableMap<String, Object> metadata; - - Write(Class<T> type) { - this( - null, - null, - "", - 0, - DEFAULT_SHARD_TEMPLATE, - type, - null, - DEFAULT_CODEC, - ImmutableMap.<String, Object>of(), - false, - null); - } + abstract ImmutableMap<String, Object> getMetadata(); - Write( - String name, - String filenamePrefix, - String filenameSuffix, - int numShards, - String shardTemplate, - Class<T> type, - Schema schema, - SerializableAvroCodecFactory codec, - Map<String, Object> metadata, - boolean windowedWrites, - FileBasedSink.FilenamePolicy filenamePolicy) { - super(name); - this.filenamePrefix = filenamePrefix; - this.filenameSuffix = filenameSuffix; - this.numShards = numShards; - this.shardTemplate = shardTemplate; - this.type = type; - this.schema = schema; - this.codec = codec; - this.windowedWrites = windowedWrites; - this.filenamePolicy = filenamePolicy; + abstract Builder<T> toBuilder(); - Map<String, String> badKeys = Maps.newLinkedHashMap(); - for (Map.Entry<String, Object> entry : metadata.entrySet()) { - Object v = entry.getValue(); - if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { - badKeys.put(entry.getKey(), v.getClass().getSimpleName()); - } - } - checkArgument( - badKeys.isEmpty(), - "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys); - this.metadata = ImmutableMap.copyOf(metadata); + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFilenamePrefix(String filenamePrefix); + abstract Builder<T> setFilenameSuffix(String filenameSuffix); + abstract Builder<T> setNumShards(int numShards); + abstract Builder<T> setShardTemplate(String shardTemplate); + abstract Builder<T> setRecordClass(Class<T> recordClass); + abstract Builder<T> setSchema(Schema schema); + abstract Builder<T> setWindowedWrites(boolean windowedWrites); + abstract Builder<T> setFilenamePolicy(FileBasedSink.FilenamePolicy filenamePolicy); + abstract Builder<T> setCodec(SerializableAvroCodecFactory codec); + abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata); + + abstract Write<T> build(); } /** @@ -330,34 +312,12 @@ public class AvroIO { */ public Write<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + return toBuilder().setFilenamePrefix(filenamePrefix).build(); } /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */ public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) { - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + return toBuilder().setFilenamePolicy(filenamePolicy).build(); } /** @@ -367,18 +327,7 @@ public class AvroIO { */ public Write<T> withSuffix(String filenameSuffix) { validateOutputComponent(filenameSuffix); - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + return toBuilder().setFilenameSuffix(filenameSuffix).build(); } /** @@ -394,18 +343,7 @@ public class AvroIO { */ public Write<T> withNumShards(int numShards) { checkArgument(numShards >= 0); - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + return toBuilder().setNumShards(numShards).build(); } /** @@ -415,18 +353,7 @@ public class AvroIO { * @see ShardNameTemplate */ public Write<T> withShardNameTemplate(String shardTemplate) { - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + return toBuilder().setShardTemplate(shardTemplate).build(); } /** @@ -439,76 +366,19 @@ public class AvroIO { } public Write<T> withWindowedWrites() { - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - true, - filenamePolicy); + return toBuilder().setWindowedWrites(true).build(); } /** * Writes to Avro file(s) containing records whose type is the specified Avro-generated class. */ public Write<T> withSchema(Class<T> type) { - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - ReflectData.get().getSchema(type), - codec, - metadata, - windowedWrites, - filenamePolicy); - } - - /** Writes to Avro file(s) containing records of the specified schema. */ - public Write<GenericRecord> withSchema(Schema schema) { - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - GenericRecord.class, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); - } - - /** - * Writes to Avro file(s) containing records of the specified schema in a JSON-encoded string - * form. - */ - public Write<GenericRecord> withSchema(String schema) { - return withSchema((new Schema.Parser()).parse(schema)); + return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build(); } /** Writes to Avro file(s) compressed using specified codec. */ public Write<T> withCodec(CodecFactory codec) { - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - new SerializableAvroCodecFactory(codec), - metadata, - windowedWrites, - filenamePolicy); + return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build(); } /** @@ -517,56 +387,56 @@ public class AvroIO { * <p>Supported value types are String, Long, and byte[]. */ public Write<T> withMetadata(Map<String, Object> metadata) { - return new Write<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + Map<String, String> badKeys = Maps.newLinkedHashMap(); + for (Map.Entry<String, Object> entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { + badKeys.put(entry.getKey(), v.getClass().getSimpleName()); + } + } + checkArgument( + badKeys.isEmpty(), + "Metadata value type must be one of String, Long, or byte[]. Found {}", + badKeys); + return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build(); } @Override public PDone expand(PCollection<T> input) { - if (filenamePolicy == null && filenamePrefix == null) { + if (getFilenamePolicy() == null && getFilenamePrefix() == null) { throw new IllegalStateException( "need to set the filename prefix of an AvroIO.Write transform"); } - if (filenamePolicy != null && filenamePrefix != null) { + if (getFilenamePolicy() != null && getFilenamePrefix() != null) { throw new IllegalStateException( "cannot set both a filename policy and a filename prefix"); } - if (schema == null) { + if (getSchema() == null) { throw new IllegalStateException("need to set the schema of an AvroIO.Write transform"); } WriteFiles<T> write = null; - if (filenamePolicy != null) { + if (getFilenamePolicy() != null) { write = WriteFiles.to( new AvroSink<>( - filenamePolicy, - AvroCoder.of(type, schema), - codec, - metadata)); + getFilenamePolicy(), + AvroCoder.of(getRecordClass(), getSchema()), + getCodec(), + getMetadata())); } else { write = WriteFiles.to( new AvroSink<>( - filenamePrefix, - filenameSuffix, - shardTemplate, - AvroCoder.of(type, schema), - codec, - metadata)); + getFilenamePrefix(), + getFilenameSuffix(), + getShardTemplate(), + AvroCoder.of(getRecordClass(), getSchema()), + getCodec(), + getMetadata())); } if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } - if (windowedWrites) { + if (getWindowedWrites()) { write = write.withWindowedWrites(); } return input.apply("Write", write); @@ -576,20 +446,20 @@ public class AvroIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder - .add(DisplayData.item("schema", type) + .add(DisplayData.item("schema", getRecordClass()) .withLabel("Record Schema")) - .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix) + .addIfNotNull(DisplayData.item("filePrefix", getFilenamePrefix()) .withLabel("Output File Prefix")) - .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate) + .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate()) .withLabel("Output Shard Name Template"), DEFAULT_SHARD_TEMPLATE) - .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix) + .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix()) .withLabel("Output File Suffix"), "") - .addIfNotDefault(DisplayData.item("numShards", numShards) + .addIfNotDefault(DisplayData.item("numShards", getNumShards()) .withLabel("Maximum Output Shards"), 0) - .addIfNotDefault(DisplayData.item("codec", codec.toString()) + .addIfNotDefault(DisplayData.item("codec", getCodec().toString()) .withLabel("Avro Compression Codec"), DEFAULT_CODEC.toString()); builder.include("Metadata", new Metadata()); @@ -598,7 +468,7 @@ public class AvroIO { private class Metadata implements HasDisplayData { @Override public void populateDisplayData(DisplayData.Builder builder) { - for (Map.Entry<String, Object> entry : metadata.entrySet()) { + for (Map.Entry<String, Object> entry : getMetadata().entrySet()) { DisplayData.Type type = DisplayData.inferType(entry.getValue()); if (type != null) { builder.add(DisplayData.item(entry.getKey(), type, entry.getValue())); @@ -612,49 +482,10 @@ public class AvroIO { } } - /** - * Returns the current shard name template string. - */ - public String getShardNameTemplate() { - return shardTemplate; - } - @Override protected Coder<Void> getDefaultOutputCoder() { return VoidCoder.of(); } - - public String getFilenamePrefix() { - return filenamePrefix; - } - - public String getShardTemplate() { - return shardTemplate; - } - - public int getNumShards() { - return numShards; - } - - public String getFilenameSuffix() { - return filenameSuffix; - } - - public Class<T> getType() { - return type; - } - - public Schema getSchema() { - return schema; - } - - public CodecFactory getCodec() { - return codec.getCodec(); - } - - public Map<String, Object> getMetadata() { - return metadata; - } } // Pattern which matches old-style shard output patterns, which are now http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 38984b5..4abd3e0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -422,9 +422,9 @@ public class AvroIOTest { .to("gs://bucket/foo/baz") .withCodec(CodecFactory.deflateCodec(9)); - AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write); - - assertEquals(CodecFactory.deflateCodec(9).toString(), serdeWrite.getCodec().toString()); + assertEquals( + CodecFactory.deflateCodec(9).toString(), + SerializableUtils.clone(write.getCodec()).getCodec().toString()); } @Test @@ -434,9 +434,9 @@ public class AvroIOTest { .to("gs://bucket/foo/baz") .withCodec(CodecFactory.xzCodec(9)); - AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write); - - assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString()); + assertEquals( + CodecFactory.xzCodec(9).toString(), + SerializableUtils.clone(write.getCodec()).getCodec().toString()); } @Test @@ -482,7 +482,7 @@ public class AvroIOTest { p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write); p.run(); - String shardNameTemplate = write.getShardNameTemplate(); + String shardNameTemplate = write.getShardTemplate(); assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate); } @@ -580,9 +580,8 @@ public class AvroIOTest { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); - AvroIO.Write<?> write = AvroIO.<GenericRecord>write() - .to(outputPath) - .withSchema(Schema.create(Schema.Type.STRING)); + AvroIO.Write<?> write = AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING)) + .to(outputPath); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("AvroIO.Write should include the file pattern in its primitive transform", http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index 51c9691..fb57d5c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -275,12 +275,12 @@ public class AvroIOTransformTest { generatedClass }, new Object[] { - AvroIO.write().withSchema(SCHEMA), + AvroIO.writeGenericRecords(SCHEMA), fromSchema }, new Object[] { - AvroIO.write().withSchema(SCHEMA_STRING), + AvroIO.writeGenericRecords(SCHEMA_STRING), fromSchemaString }) .build();