Removes AvroIO.Write.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1dfd4e2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1dfd4e2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1dfd4e2 Branch: refs/heads/master Commit: d1dfd4e2a8b82451f28f1f0e6f261eae0d51bb5b Parents: 439f2ca Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Apr 28 18:59:03 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 910 ++++++++----------- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 37 +- .../apache/beam/sdk/io/AvroIOTransformTest.java | 12 +- 4 files changed, 385 insertions(+), 576 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index 62db14f..c58d81e 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -75,7 +75,7 @@ public class AvroPipelineTest { Pipeline p = pipelineRule.createPipeline(); PCollection<GenericRecord> input = p.apply( AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath())); - input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); + input.apply(AvroIO.write().to(outputDir.getAbsolutePath()).withSchema(schema)); p.run().waitUntilFinish(); List<GenericRecord> records = readGenericFile(); http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 2f1d917..4bde6ec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -70,24 +70,24 @@ import org.apache.beam.sdk.values.PDone; * // A Read from a GCS file (runs locally and using remote execution): * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); * PCollection<GenericRecord> records = - * p.apply(AvroIO.Read + * p.apply(AvroIO.read() * .from("gs://my_bucket/path/to/records-*.avro") * .withSchema(schema)); * } </pre> * * <p>To write a {@link PCollection} to one or more Avro files, use - * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to(String)} to specify + * {@link AvroIO.Write}, specifying {@code AvroIO.write().to(String)} to specify * the path of the file to write to (e.g., a local filename or sharded * filename pattern if running locally, or a Google Cloud Storage * filename or sharded filename pattern of the form - * {@code "gs://<bucket>/<filepath>"}). {@link AvroIO.Write#to(FileBasedSink.FilenamePolicy)} + * {@code "gs://<bucket>/<filepath>"}). {@code AvroIO.write().to(FileBasedSink.FilenamePolicy)} * can also be used to specify a custom file naming policy. * * <p>By default, all input is put into the global window before writing. If per-window writes are * desired - for example, when using a streaming runner - - * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be + * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and triggering to be * preserved. When producing windowed writes, the number of output shards must be set explicitly - * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a + * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a * runner-chosen value, so you may need not set it yourself. A * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce * unique filenames. @@ -103,13 +103,13 @@ import org.apache.beam.sdk.values.PDone; * <pre> {@code * // A simple Write to a local file (only runs locally): * PCollection<AvroAutoGenClass> records = ...; - * records.apply(AvroIO.Write.to("/path/to/file.avro") + * records.apply(AvroIO.write().to("/path/to/file.avro") * .withSchema(AvroAutoGenClass.class)); * * // A Write to a sharded GCS file (runs locally and using remote execution): * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); * PCollection<GenericRecord> records = ...; - * records.apply("WriteToAvro", AvroIO.Write + * records.apply("WriteToAvro", AvroIO.write() * .to("gs://my_bucket/path/to/numbers") * .withSchema(schema) * .withSuffix(".avro")); @@ -149,6 +149,14 @@ public class AvroIO { return readGenericRecords(new Schema.Parser().parse(schema)); } + /** + * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding + * pattern). + */ + public static <T> Write<T> write() { + return new Write<>(null); + } + /** Implementation of {@link #read}. */ @AutoValue public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { @@ -229,45 +237,161 @@ public class AvroIO { ///////////////////////////////////////////////////////////////////////////// - /** - * A root {@link PTransform} that writes a {@link PCollection} to an Avro file (or - * multiple Avro files matching a sharding pattern). - */ - public static class Write { + /** Implementation of {@link #write}. */ + public static class Write<T> extends PTransform<PCollection<T>, PDone> { + /** + * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or + * multiple Avro files matching a sharding pattern). + * + * @param <T> the type of each of the elements of the input PCollection + */ + private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; + private static final SerializableAvroCodecFactory DEFAULT_CODEC = + new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); + // This should be a multiple of 4 to not get a partial encoded byte. + private static final int METADATA_BYTES_MAX_LENGTH = 40; + + /** The filename to write to. */ + @Nullable + final String filenamePrefix; + /** Suffix to use for each filename. */ + final String filenameSuffix; + /** Requested number of shards. 0 for automatic. */ + final int numShards; + /** Shard template string. */ + final String shardTemplate; + /** The class type of the records. */ + final Class<T> type; + /** The schema of the output file. */ + @Nullable + final Schema schema; + final boolean windowedWrites; + FileBasedSink.FilenamePolicy filenamePolicy; /** - * Returns a {@link PTransform} that writes to the file(s) - * with the given prefix. This can be a local filename + * The codec used to encode the blocks in the Avro file. String value drawn from those in + * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html + */ + final SerializableAvroCodecFactory codec; + /** Avro file metadata. */ + final ImmutableMap<String, Object> metadata; + + Write(Class<T> type) { + this( + null, + null, + "", + 0, + DEFAULT_SHARD_TEMPLATE, + type, + null, + DEFAULT_CODEC, + ImmutableMap.<String, Object>of(), + false, + null); + } + + Write( + String name, + String filenamePrefix, + String filenameSuffix, + int numShards, + String shardTemplate, + Class<T> type, + Schema schema, + SerializableAvroCodecFactory codec, + Map<String, Object> metadata, + boolean windowedWrites, + FileBasedSink.FilenamePolicy filenamePolicy) { + super(name); + this.filenamePrefix = filenamePrefix; + this.filenameSuffix = filenameSuffix; + this.numShards = numShards; + this.shardTemplate = shardTemplate; + this.type = type; + this.schema = schema; + this.codec = codec; + this.windowedWrites = windowedWrites; + this.filenamePolicy = filenamePolicy; + + Map<String, String> badKeys = Maps.newLinkedHashMap(); + for (Map.Entry<String, Object> entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { + badKeys.put(entry.getKey(), v.getClass().getSimpleName()); + } + } + checkArgument( + badKeys.isEmpty(), + "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys); + this.metadata = ImmutableMap.copyOf(metadata); + } + + /** + * Writes to the file(s) with the given prefix. This can be a local filename * (if running locally), or a Google Cloud Storage filename of * the form {@code "gs://<bucket>/<filepath>"} * (if running locally or using remote execution). * * <p>The files written will begin with this prefix, followed by - * a shard identifier (see {@link Bound#withNumShards}, and end - * in a common extension, if given by {@link Bound#withSuffix}. + * a shard identifier (see {@link #withNumShards}, and end + * in a common extension, if given by {@link #withSuffix}. */ - public static Bound<GenericRecord> to(String prefix) { - return new Bound<>(GenericRecord.class).to(prefix); + public Write<T> to(String filenamePrefix) { + validateOutputComponent(filenamePrefix); + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + codec, + metadata, + windowedWrites, + filenamePolicy); } - /** - * Returns a {@link PTransform} that writes to the file(s) specified by the provided - * {@link FileBasedSink.FilenamePolicy}. - */ - public static Bound<GenericRecord> to(FileBasedSink.FilenamePolicy filenamePolicy) { - return new Bound<>(GenericRecord.class).to(filenamePolicy); + /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */ + public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) { + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + codec, + metadata, + windowedWrites, + filenamePolicy); } /** - * Returns a {@link PTransform} that writes to the file(s) with the - * given filename suffix. + * Writes to the file(s) with the given filename suffix. + * + * <p>See {@link ShardNameTemplate} for a description of shard templates. */ - public static Bound<GenericRecord> withSuffix(String filenameSuffix) { - return new Bound<>(GenericRecord.class).withSuffix(filenameSuffix); + public Write<T> withSuffix(String filenameSuffix) { + validateOutputComponent(filenameSuffix); + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + codec, + metadata, + windowedWrites, + filenamePolicy); } /** - * Returns a {@link PTransform} that uses the provided shard count. + * Uses the provided shard count. * * <p>Constraining the number of shards is likely to reduce * the performance of a pipeline. Setting this value is not recommended @@ -275,585 +399,271 @@ public class AvroIO { * * @param numShards the number of shards to use, or 0 to let the system * decide. + * @see ShardNameTemplate */ - public static Bound<GenericRecord> withNumShards(int numShards) { - return new Bound<>(GenericRecord.class).withNumShards(numShards); + public Write<T> withNumShards(int numShards) { + checkArgument(numShards >= 0); + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + codec, + metadata, + windowedWrites, + filenamePolicy); } /** - * Returns a {@link PTransform} that uses the given shard name - * template. + * Returns a new {@link PTransform} that's like this one but + * that uses the given shard name template. * - * <p>See {@link ShardNameTemplate} for a description of shard templates. + * @see ShardNameTemplate */ - public static Bound<GenericRecord> withShardNameTemplate(String shardTemplate) { - return new Bound<>(GenericRecord.class).withShardNameTemplate(shardTemplate); + public Write<T> withShardNameTemplate(String shardTemplate) { + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + codec, + metadata, + windowedWrites, + filenamePolicy); } /** - * Returns a {@link PTransform} that forces a single file as - * output. + * Forces a single file as output. * - * <p>Constraining the number of shards is likely to reduce - * the performance of a pipeline. Setting this value is not recommended - * unless you require a specific number of output files. + * <p>This is a shortcut for {@code .withNumShards(1).withShardNameTemplate("")} */ - public static Bound<GenericRecord> withoutSharding() { - return new Bound<>(GenericRecord.class).withoutSharding(); + public Write<T> withoutSharding() { + return withNumShards(1).withShardNameTemplate(""); } - /** - * Returns a {@link PTransform} that writes Avro file(s) - * containing records whose type is the specified Avro-generated class. - * - * @param <T> the type of the elements of the input PCollection - */ - public static <T> Bound<T> withSchema(Class<T> type) { - return new Bound<>(type).withSchema(type); + public Write<T> withWindowedWrites() { + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + codec, + metadata, + true, + filenamePolicy); } /** - * Returns a {@link PTransform} that writes Avro file(s) - * containing records of the specified schema. + * Writes to Avro file(s) containing records whose type is the specified Avro-generated class. */ - public static Bound<GenericRecord> withSchema(Schema schema) { - return new Bound<>(GenericRecord.class).withSchema(schema); + public Write<T> withSchema(Class<T> type) { + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + ReflectData.get().getSchema(type), + codec, + metadata, + windowedWrites, + filenamePolicy); } - /** - * Returns a {@link PTransform} that writes Avro file(s) - * containing records of the specified schema in a JSON-encoded - * string form. - */ - public static Bound<GenericRecord> withSchema(String schema) { - return withSchema((new Schema.Parser()).parse(schema)); + /** Writes to Avro file(s) containing records of the specified schema. */ + public Write<GenericRecord> withSchema(Schema schema) { + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + GenericRecord.class, + schema, + codec, + metadata, + windowedWrites, + filenamePolicy); } /** - * Returns a {@link PTransform} that writes Avro file(s) that has GCS path validation on - * pipeline creation disabled. - * - * <p>This can be useful in the case where the GCS output location does - * not exist at the pipeline creation time, but is expected to be available - * at execution time. + * Writes to Avro file(s) containing records of the specified schema in a JSON-encoded string + * form. */ - public static Bound<GenericRecord> withoutValidation() { - return new Bound<>(GenericRecord.class).withoutValidation(); + public Write<GenericRecord> withSchema(String schema) { + return withSchema((new Schema.Parser()).parse(schema)); } - /** - * Returns a {@link PTransform} that writes Avro file(s) using specified codec. - */ - public static Bound<GenericRecord> withCodec(CodecFactory codec) { - return new Bound<>(GenericRecord.class).withCodec(codec); + /** Writes to Avro file(s) compressed using specified codec. */ + public Write<T> withCodec(CodecFactory codec) { + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + new SerializableAvroCodecFactory(codec), + metadata, + windowedWrites, + filenamePolicy); } /** - * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata. + * Writes to Avro file(s) with the specified metadata. * * <p>Supported value types are String, Long, and byte[]. */ - public static Bound<GenericRecord> withMetadata(Map<String, Object> metadata) { - return new Bound<>(GenericRecord.class).withMetadata(metadata); + public Write<T> withMetadata(Map<String, Object> metadata) { + return new Write<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + codec, + metadata, + windowedWrites, + filenamePolicy); } - /** - * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or - * multiple Avro files matching a sharding pattern). - * - * @param <T> the type of each of the elements of the input PCollection - */ - public static class Bound<T> extends PTransform<PCollection<T>, PDone> { - private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; - private static final SerializableAvroCodecFactory DEFAULT_CODEC = - new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); - // This should be a multiple of 4 to not get a partial encoded byte. - private static final int METADATA_BYTES_MAX_LENGTH = 40; - - /** The filename to write to. */ - @Nullable - final String filenamePrefix; - /** Suffix to use for each filename. */ - final String filenameSuffix; - /** Requested number of shards. 0 for automatic. */ - final int numShards; - /** Shard template string. */ - final String shardTemplate; - /** The class type of the records. */ - final Class<T> type; - /** The schema of the output file. */ - @Nullable - final Schema schema; - final boolean windowedWrites; - FileBasedSink.FilenamePolicy filenamePolicy; - - /** - * The codec used to encode the blocks in the Avro file. String value drawn from those in - * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html - */ - final SerializableAvroCodecFactory codec; - /** Avro file metadata. */ - final ImmutableMap<String, Object> metadata; - - Bound(Class<T> type) { - this( - null, - null, - "", - 0, - DEFAULT_SHARD_TEMPLATE, - type, - null, - DEFAULT_CODEC, - ImmutableMap.<String, Object>of(), - false, - null); - } - - Bound( - String name, - String filenamePrefix, - String filenameSuffix, - int numShards, - String shardTemplate, - Class<T> type, - Schema schema, - SerializableAvroCodecFactory codec, - Map<String, Object> metadata, - boolean windowedWrites, - FileBasedSink.FilenamePolicy filenamePolicy) { - super(name); - this.filenamePrefix = filenamePrefix; - this.filenameSuffix = filenameSuffix; - this.numShards = numShards; - this.shardTemplate = shardTemplate; - this.type = type; - this.schema = schema; - this.codec = codec; - this.windowedWrites = windowedWrites; - this.filenamePolicy = filenamePolicy; - - Map<String, String> badKeys = Maps.newLinkedHashMap(); - for (Map.Entry<String, Object> entry : metadata.entrySet()) { - Object v = entry.getValue(); - if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { - badKeys.put(entry.getKey(), v.getClass().getSimpleName()); - } - } - checkArgument( - badKeys.isEmpty(), - "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys); - this.metadata = ImmutableMap.copyOf(metadata); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that writes to the file(s) with the given filename prefix. - * - * <p>See {@link AvroIO.Write#to(String)} for more information - * about filenames. - * - * <p>Does not modify this object. - */ - public Bound<T> to(String filenamePrefix) { - validateOutputComponent(filenamePrefix); - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); - } - - public Bound<T> to(FileBasedSink.FilenamePolicy filenamePolicy) { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that writes to the file(s) with the given filename suffix. - * - * <p>See {@link ShardNameTemplate} for a description of shard templates. - * - * <p>Does not modify this object. - */ - public Bound<T> withSuffix(String filenameSuffix) { - validateOutputComponent(filenameSuffix); - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that uses the provided shard count. - * - * <p>Constraining the number of shards is likely to reduce - * the performance of a pipeline. Setting this value is not recommended - * unless you require a specific number of output files. - * - * <p>Does not modify this object. - * - * @param numShards the number of shards to use, or 0 to let the system - * decide. - * @see ShardNameTemplate - */ - public Bound<T> withNumShards(int numShards) { - checkArgument(numShards >= 0); - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that uses the given shard name template. - * - * <p>Does not modify this object. - * - * @see ShardNameTemplate - */ - public Bound<T> withShardNameTemplate(String shardTemplate) { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that forces a single file as output. - * - * <p>This is a shortcut for - * {@code .withNumShards(1).withShardNameTemplate("")} - * - * <p>Does not modify this object. - */ - public Bound<T> withoutSharding() { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - 1, - "", - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); - } - - public Bound<T> withWindowedWrites() { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - true, - filenamePolicy); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that writes to Avro file(s) containing records whose type is the - * specified Avro-generated class. - * - * <p>Does not modify this object. - * - * @param <X> the type of the elements of the input PCollection - */ - public <X> Bound<X> withSchema(Class<X> type) { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - ReflectData.get().getSchema(type), - codec, - metadata, - windowedWrites, - filenamePolicy); + @Override + public PDone expand(PCollection<T> input) { + if (filenamePolicy == null && filenamePrefix == null) { + throw new IllegalStateException( + "need to set the filename prefix of an AvroIO.Write transform"); } - - /** - * Returns a new {@link PTransform} that's like this one but - * that writes to Avro file(s) containing records of the specified - * schema. - * - * <p>Does not modify this object. - */ - public Bound<GenericRecord> withSchema(Schema schema) { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - GenericRecord.class, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + if (filenamePolicy != null && filenamePrefix != null) { + throw new IllegalStateException( + "cannot set both a filename policy and a filename prefix"); } - - /** - * Returns a new {@link PTransform} that's like this one but - * that writes to Avro file(s) containing records of the specified - * schema in a JSON-encoded string form. - * - * <p>Does not modify this object. - */ - public Bound<GenericRecord> withSchema(String schema) { - return withSchema((new Schema.Parser()).parse(schema)); + if (schema == null) { + throw new IllegalStateException("need to set the schema of an AvroIO.Write transform"); } - /** - * Returns a new {@link PTransform} that's like this one but - * that has GCS output path validation on pipeline creation disabled. - * - * <p>Does not modify this object. - * - * <p>This can be useful in the case where the GCS output location does - * not exist at the pipeline creation time, but is expected to be - * available at execution time. - */ - public Bound<T> withoutValidation() { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + WriteFiles<T> write = null; + if (filenamePolicy != null) { + write = WriteFiles.to( + new AvroSink<>( + filenamePolicy, + AvroCoder.of(type, schema), + codec, + metadata)); + } else { + write = WriteFiles.to( + new AvroSink<>( + filenamePrefix, + filenameSuffix, + shardTemplate, + AvroCoder.of(type, schema), + codec, + metadata)); } - - /** - * Returns a new {@link PTransform} that's like this one but - * that writes to Avro file(s) compressed using specified codec. - * - * <p>Does not modify this object. - */ - public Bound<T> withCodec(CodecFactory codec) { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - new SerializableAvroCodecFactory(codec), - metadata, - windowedWrites, - filenamePolicy); + if (getNumShards() > 0) { + write = write.withNumShards(getNumShards()); } - - /** - * Returns a new {@link PTransform} that's like this one but - * that writes to Avro file(s) with the specified metadata. - * - * <p>Does not modify this object. - */ - public Bound<T> withMetadata(Map<String, Object> metadata) { - return new Bound<>( - name, - filenamePrefix, - filenameSuffix, - numShards, - shardTemplate, - type, - schema, - codec, - metadata, - windowedWrites, - filenamePolicy); + if (windowedWrites) { + write = write.withWindowedWrites(); } + return input.apply("Write", write); + } - @Override - public PDone expand(PCollection<T> input) { - if (filenamePolicy == null && filenamePrefix == null) { - throw new IllegalStateException( - "need to set the filename prefix of an AvroIO.Write transform"); - } - if (filenamePolicy != null && filenamePrefix != null) { - throw new IllegalStateException( - "cannot set both a filename policy and a filename prefix"); - } - if (schema == null) { - throw new IllegalStateException("need to set the schema of an AvroIO.Write transform"); - } - - WriteFiles<T> write = null; - if (filenamePolicy != null) { - write = WriteFiles.to( - new AvroSink<>( - filenamePolicy, - AvroCoder.of(type, schema), - codec, - metadata)); - } else { - write = WriteFiles.to( - new AvroSink<>( - filenamePrefix, - filenameSuffix, - shardTemplate, - AvroCoder.of(type, schema), - codec, - metadata)); - } - if (getNumShards() > 0) { - write = write.withNumShards(getNumShards()); - } - if (windowedWrites) { - write = write.withWindowedWrites(); - } - return input.apply("Write", write); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("schema", type) + .withLabel("Record Schema")) + .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix) + .withLabel("Output File Prefix")) + .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate) + .withLabel("Output Shard Name Template"), + DEFAULT_SHARD_TEMPLATE) + .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix) + .withLabel("Output File Suffix"), + "") + .addIfNotDefault(DisplayData.item("numShards", numShards) + .withLabel("Maximum Output Shards"), + 0) + .addIfNotDefault(DisplayData.item("codec", codec.toString()) + .withLabel("Avro Compression Codec"), + DEFAULT_CODEC.toString()); + builder.include("Metadata", new Metadata()); + } + private class Metadata implements HasDisplayData { @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .add(DisplayData.item("schema", type) - .withLabel("Record Schema")) - .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix) - .withLabel("Output File Prefix")) - .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate) - .withLabel("Output Shard Name Template"), - DEFAULT_SHARD_TEMPLATE) - .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix) - .withLabel("Output File Suffix"), - "") - .addIfNotDefault(DisplayData.item("numShards", numShards) - .withLabel("Maximum Output Shards"), - 0) - .addIfNotDefault(DisplayData.item("codec", codec.toString()) - .withLabel("Avro Compression Codec"), - DEFAULT_CODEC.toString()); - builder.include("Metadata", new Metadata()); - } - - private class Metadata implements HasDisplayData { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - for (Map.Entry<String, Object> entry : metadata.entrySet()) { - DisplayData.Type type = DisplayData.inferType(entry.getValue()); - if (type != null) { - builder.add(DisplayData.item(entry.getKey(), type, entry.getValue())); - } else { - String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue()); - String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH - ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "..."; - builder.add(DisplayData.item(entry.getKey(), repr)); - } + for (Map.Entry<String, Object> entry : metadata.entrySet()) { + DisplayData.Type type = DisplayData.inferType(entry.getValue()); + if (type != null) { + builder.add(DisplayData.item(entry.getKey(), type, entry.getValue())); + } else { + String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue()); + String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH + ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "..."; + builder.add(DisplayData.item(entry.getKey(), repr)); } } } + } - /** - * Returns the current shard name template string. - */ - public String getShardNameTemplate() { - return shardTemplate; - } - - @Override - protected Coder<Void> getDefaultOutputCoder() { - return VoidCoder.of(); - } + /** + * Returns the current shard name template string. + */ + public String getShardNameTemplate() { + return shardTemplate; + } - public String getFilenamePrefix() { - return filenamePrefix; - } + @Override + protected Coder<Void> getDefaultOutputCoder() { + return VoidCoder.of(); + } - public String getShardTemplate() { - return shardTemplate; - } + public String getFilenamePrefix() { + return filenamePrefix; + } - public int getNumShards() { - return numShards; - } + public String getShardTemplate() { + return shardTemplate; + } - public String getFilenameSuffix() { - return filenameSuffix; - } + public int getNumShards() { + return numShards; + } - public Class<T> getType() { - return type; - } + public String getFilenameSuffix() { + return filenameSuffix; + } - public Schema getSchema() { - return schema; - } + public Class<T> getType() { + return type; + } - public CodecFactory getCodec() { - return codec.getCodec(); - } + public Schema getSchema() { + return schema; + } - public Map<String, Object> getMetadata() { - return metadata; - } + public CodecFactory getCodec() { + return codec.getCodec(); } - /** Disallow construction of utility class. */ - private Write() {} + public Map<String, Object> getMetadata() { + return metadata; + } } // Pattern which matches old-style shard output patterns, which are now http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 2144b0d..7df1b18 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -51,7 +51,6 @@ import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.io.AvroIO.Write.Bound; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; @@ -104,7 +103,7 @@ public class AvroIOTest { @Test public void testAvroIOGetName() { assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName()); - assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName()); + assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName()); } @DefaultCoder(AvroCoder.class) @@ -145,7 +144,7 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.Write.to(outputFile.getAbsolutePath()) + .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) .withoutSharding() .withSchema(GenericClass.class)); p.run(); @@ -169,7 +168,7 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.Write.to(outputFile.getAbsolutePath()) + .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) .withoutSharding() .withCodec(CodecFactory.deflateCodec(9)) .withSchema(GenericClass.class)); @@ -196,7 +195,7 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.Write.to(outputFile.getAbsolutePath()) + .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) .withoutSharding() .withSchema(GenericClass.class) .withCodec(CodecFactory.nullCodec())); @@ -264,7 +263,7 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.Write.to(outputFile.getAbsolutePath()) + .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) .withoutSharding() .withSchema(GenericClass.class)); p.run(); @@ -372,7 +371,7 @@ public class AvroIOTest { windowedAvroWritePipeline .apply(values) .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1)))) - .apply(AvroIO.Write.to(new WindowedFilenamePolicy(outputFilePrefix)) + .apply(AvroIO.<GenericClass>write().to(new WindowedFilenamePolicy(outputFilePrefix)) .withWindowedWrites() .withNumShards(2) .withSchema(GenericClass.class)); @@ -407,14 +406,14 @@ public class AvroIOTest { @Test public void testWriteWithDefaultCodec() throws Exception { - AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write + AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write() .to("gs://bucket/foo/baz"); assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString()); } @Test public void testWriteWithCustomCodec() throws Exception { - AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write + AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write() .to("gs://bucket/foo/baz") .withCodec(CodecFactory.snappyCodec()); assertEquals(SNAPPY_CODEC, write.getCodec().toString()); @@ -423,11 +422,11 @@ public class AvroIOTest { @Test @SuppressWarnings("unchecked") public void testWriteWithSerDeCustomDeflateCodec() throws Exception { - AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write + AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write() .to("gs://bucket/foo/baz") .withCodec(CodecFactory.deflateCodec(9)); - AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write); + AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write); assertEquals(CodecFactory.deflateCodec(9).toString(), serdeWrite.getCodec().toString()); } @@ -435,11 +434,11 @@ public class AvroIOTest { @Test @SuppressWarnings("unchecked") public void testWriteWithSerDeCustomXZCodec() throws Exception { - AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write + AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write() .to("gs://bucket/foo/baz") .withCodec(CodecFactory.xzCodec(9)); - AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write); + AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write); assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString()); } @@ -453,7 +452,7 @@ public class AvroIOTest { File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.Write.to(outputFile.getAbsolutePath()) + .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath()) .withoutSharding() .withSchema(GenericClass.class) .withMetadata(ImmutableMap.<String, Object>of( @@ -475,7 +474,8 @@ public class AvroIOTest { File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); String outputFilePrefix = baseOutputFile.getAbsolutePath(); - Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class); + AvroIO.Write<String> write = + AvroIO.<String>write().to(outputFilePrefix).withSchema(String.class); if (numShards > 1) { System.out.println("NumShards " + numShards); write = write.withNumShards(numShards); @@ -556,7 +556,7 @@ public class AvroIOTest { @Test public void testWriteDisplayData() { - AvroIO.Write.Bound<?> write = AvroIO.Write + AvroIO.Write<?> write = AvroIO.<GenericClass>write() .to("foo") .withShardNameTemplate("-SS-of-NN-") .withSuffix("bar") @@ -584,10 +584,9 @@ public class AvroIOTest { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); - AvroIO.Write.Bound<?> write = AvroIO.Write + AvroIO.Write<?> write = AvroIO.<GenericRecord>write() .to(outputPath) - .withSchema(Schema.create(Schema.Type.STRING)) - .withoutValidation(); + .withSchema(Schema.create(Schema.Type.STRING)); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("AvroIO.Write should include the file pattern in its primitive transform", http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index b974663..ba7f1b9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -271,16 +271,16 @@ public class AvroIOTransformTest { ImmutableList.<Object[]>builder() .add( new Object[] { - AvroIO.Write.withSchema(AvroGeneratedUser.class), + AvroIO.<AvroGeneratedUser>write().withSchema(AvroGeneratedUser.class), generatedClass }, new Object[] { - AvroIO.Write.withSchema(SCHEMA), + AvroIO.write().withSchema(SCHEMA), fromSchema }, new Object[] { - AvroIO.Write.withSchema(SCHEMA_STRING), + AvroIO.write().withSchema(SCHEMA_STRING), fromSchemaString }) .build(); @@ -288,17 +288,17 @@ public class AvroIOTransformTest { @SuppressWarnings("DefaultAnnotationParam") @Parameterized.Parameter(0) - public AvroIO.Write.Bound writeTransform; + public AvroIO.Write writeTransform; @Parameterized.Parameter(1) public String testAlias; - private <T> void runTestWrite(final AvroIO.Write.Bound<T> writeBuilder) + private <T> void runTestWrite(final AvroIO.Write<T> writeBuilder) throws Exception { final File avroFile = tmpFolder.newFile("file.avro"); final AvroGeneratedUser[] users = generateAvroObjects(); - final AvroIO.Write.Bound<T> write = writeBuilder.to(avroFile.getPath()); + final AvroIO.Write<T> write = writeBuilder.to(avroFile.getPath()); @SuppressWarnings("unchecked") final PCollection<T> input =