http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index fe0b97d..3198829 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -36,10 +36,12 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; @@ -73,9 +75,9 @@ public class TFRecordIO { */ public static Write write() { return new AutoValue_TFRecordIO_Write.Builder() - .setFilenameSuffix("") + .setShardTemplate(null) + .setFilenameSuffix(null) .setNumShards(0) - .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE) .setCompressionType(CompressionType.NONE) .build(); } @@ -212,7 +214,7 @@ public class TFRecordIO { @Override protected Coder<byte[]> getDefaultOutputCoder() { - return DEFAULT_BYTE_ARRAY_CODER; + return ByteArrayCoder.of(); } } @@ -221,20 +223,17 @@ public class TFRecordIO { /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> { - private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; - - /** The prefix of each file written, combined with suffix and shardTemplate. */ - @Nullable - abstract ValueProvider<String> getFilenamePrefix(); + /** The directory to which files will be written. */ + @Nullable abstract ValueProvider<ResourceId> getOutputPrefix(); /** The suffix of each file written, combined with prefix and shardTemplate. */ - abstract String getFilenameSuffix(); + @Nullable abstract String getFilenameSuffix(); /** Requested number of shards. 0 for automatic. */ abstract int getNumShards(); /** The shard template of each file written, combined with prefix and suffix. */ - abstract String getShardTemplate(); + @Nullable abstract String getShardTemplate(); /** Option to indicate the output sink's compression type. Default is NONE. */ abstract CompressionType getCompressionType(); @@ -243,38 +242,51 @@ public class TFRecordIO { @AutoValue.Builder abstract static class Builder { - abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix); + abstract Builder setOutputPrefix(ValueProvider<ResourceId> outputPrefix); + + abstract Builder setShardTemplate(String shardTemplate); abstract Builder setFilenameSuffix(String filenameSuffix); abstract Builder setNumShards(int numShards); - abstract Builder setShardTemplate(String shardTemplate); - abstract Builder setCompressionType(CompressionType compressionType); abstract Write build(); } /** - * Writes to TFRecord file(s) with the given prefix. This can be a local filename - * (if running locally), or a Google Cloud Storage filename of - * the form {@code "gs://<bucket>/<filepath>"} - * (if running locally or using remote execution). + * Writes TFRecord file(s) with the given output prefix. The {@code prefix} will be used as a + * to generate a {@link ResourceId} using any supported {@link FileSystem}. + * + * <p>In addition to their prefix, created files will have a shard identifier (see + * {@link #withNumShards(int)}), and end in a common suffix, if given by + * {@link #withSuffix(String)}. + * + * <p>For more information on filenames, see {@link DefaultFilenamePolicy}. + */ + public Write to(String outputPrefix) { + return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)); + } + + /** + * Writes TFRecord file(s) with a prefix given by the specified resource. + * + * <p>In addition to their prefix, created files will have a shard identifier (see + * {@link #withNumShards(int)}), and end in a common suffix, if given by + * {@link #withSuffix(String)}. * - * <p>The files written will begin with this prefix, followed by - * a shard identifier (see {@link #withNumShards(int)}, and end - * in a common extension, if given by {@link #withSuffix(String)}. + * <p>For more information on filenames, see {@link DefaultFilenamePolicy}. */ - public Write to(String filenamePrefix) { - return to(StaticValueProvider.of(filenamePrefix)); + public Write to(ResourceId outputResource) { + return toResource(StaticValueProvider.of(outputResource)); } /** - * Like {@link #to(String)}, but with a {@link ValueProvider}. + * Like {@link #to(ResourceId)}. */ - public Write to(ValueProvider<String> filenamePrefix) { - return toBuilder().setFilenamePrefix(filenamePrefix).build(); + public Write toResource(ValueProvider<ResourceId> outputResource) { + return toBuilder().setOutputPrefix(outputResource).build(); } /** @@ -282,8 +294,8 @@ public class TFRecordIO { * * @see ShardNameTemplate */ - public Write withSuffix(String nameExtension) { - return toBuilder().setFilenameSuffix(nameExtension).build(); + public Write withSuffix(String suffix) { + return toBuilder().setFilenameSuffix(suffix).build(); } /** @@ -298,7 +310,7 @@ public class TFRecordIO { * @see ShardNameTemplate */ public Write withNumShards(int numShards) { - checkArgument(numShards >= 0); + checkArgument(numShards >= 0, "Number of shards %s must be >= 0", numShards); return toBuilder().setNumShards(numShards).build(); } @@ -338,16 +350,13 @@ public class TFRecordIO { @Override public PDone expand(PCollection<byte[]> input) { - if (getFilenamePrefix() == null) { - throw new IllegalStateException( - "need to set the filename prefix of a TFRecordIO.Write transform"); - } - org.apache.beam.sdk.io.WriteFiles<byte[]> write = - org.apache.beam.sdk.io.WriteFiles.to( + checkState(getOutputPrefix() != null, + "need to set the output prefix of a TFRecordIO.Write transform"); + WriteFiles<byte[]> write = WriteFiles.to( new TFRecordSink( - getFilenamePrefix(), - getFilenameSuffix(), + getOutputPrefix(), getShardTemplate(), + getFilenameSuffix(), getCompressionType())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); @@ -359,20 +368,23 @@ public class TFRecordIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String prefixString = getFilenamePrefix().isAccessible() - ? getFilenamePrefix().get() : getFilenamePrefix().toString(); + String outputPrefixString = null; + if (getOutputPrefix().isAccessible()) { + ResourceId dir = getOutputPrefix().get(); + outputPrefixString = dir.toString(); + } else { + outputPrefixString = getOutputPrefix().toString(); + } builder - .addIfNotNull(DisplayData.item("filePrefix", prefixString) + .add(DisplayData.item("filePrefix", outputPrefixString) .withLabel("Output File Prefix")) - .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix()) - .withLabel("Output File Suffix"), "") - .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate()) - .withLabel("Output Shard Name Template"), - DEFAULT_SHARD_TEMPLATE) + .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix()) + .withLabel("Output File Suffix")) + .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate()) + .withLabel("Output Shard Name Template")) .addIfNotDefault(DisplayData.item("numShards", getNumShards()) .withLabel("Maximum Output Shards"), 0) - .add(DisplayData - .item("compressionType", getCompressionType().toString()) + .add(DisplayData.item("compressionType", getCompressionType().toString()) .withLabel("Compression Type")); } @@ -537,14 +549,24 @@ public class TFRecordIO { @VisibleForTesting static class TFRecordSink extends FileBasedSink<byte[]> { @VisibleForTesting - TFRecordSink(ValueProvider<String> baseOutputFilename, - String extension, - String fileNameTemplate, - TFRecordIO.CompressionType compressionType) { - super(baseOutputFilename, extension, fileNameTemplate, + TFRecordSink(ValueProvider<ResourceId> outputPrefix, + @Nullable String shardTemplate, + @Nullable String suffix, + TFRecordIO.CompressionType compressionType) { + super( + outputPrefix, + DefaultFilenamePolicy.constructUsingStandardParameters( + outputPrefix, shardTemplate, suffix), writableByteChannelFactory(compressionType)); } + private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> { + @Override + public ResourceId apply(ResourceId input) { + return input.getCurrentDirectory(); + } + } + @Override public FileBasedWriteOperation<byte[]> createWriteOperation() { return new TFRecordWriteOperation(this); @@ -575,7 +597,7 @@ public class TFRecordIO { } @Override - public FileBasedWriter<byte[]> createWriter(PipelineOptions options) throws Exception { + public FileBasedWriter<byte[]> createWriter() throws Exception { return new TFRecordWriter(this); } }
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 6b08e1f..dbfaeee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import javax.annotation.Nullable; @@ -28,9 +29,12 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -39,16 +43,13 @@ import org.apache.beam.sdk.values.PDone; /** * {@link PTransform}s for reading and writing text files. * - * <p>To read a {@link PCollection} from one or more text files, use {@link TextIO.Read}. - * You can instantiate a transform using {@link TextIO.Read#from(String)} to specify - * the path of the file(s) to read from (e.g., a local filename or - * filename pattern if running locally, or a Google Cloud Storage - * filename or filename pattern of the form - * {@code "gs://<bucket>/<filepath>"}). + * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to + * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the + * file(s) to be read. * - * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, - * each corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', - * '\r', or '\r\n'). + * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each + * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', + * or '\r\n'). * * <p>Example: * @@ -56,16 +57,11 @@ import org.apache.beam.sdk.values.PDone; * Pipeline p = ...; * * // A simple Read of a local file (only runs locally): - * PCollection<String> lines = - * p.apply(TextIO.read().from("/local/path/to/file.txt")); + * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt")); * }</pre> * - * <p>To write a {@link PCollection} to one or more text files, use - * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify - * the path of the file to write to (e.g., a local filename or sharded - * filename pattern if running locally, or a Google Cloud Storage - * filename or sharded filename pattern of the form - * {@code "gs://<bucket>/<filepath>"}). + * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using + * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write. * * <p>By default, all input is put into the global window before writing. If per-window writes are * desired - for example, when using a streaming runner - @@ -75,8 +71,7 @@ import org.apache.beam.sdk.values.PDone; * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be * set, and unique windows and triggers must produce unique filenames. * - * <p>Any existing files with the same names as generated output files - * will be overwritten. + * <p>Any existing files with the same names as generated output files will be overwritten. * * <p>For example: * <pre>{@code @@ -93,25 +88,27 @@ import org.apache.beam.sdk.values.PDone; */ public class TextIO { /** - * Reads from one or more text files and returns a bounded {@link PCollection} containing one - * element for each line of the input files. + * A {@link PTransform} that reads from one or more text files and returns a bounded + * {@link PCollection} containing one element for each line of the input files. */ public static Read read() { return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build(); } /** - * A {@link PTransform} that writes a {@link PCollection} to text file (or - * multiple text files matching a sharding pattern), with each - * element of the input collection encoded into its own line. + * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files + * matching a sharding pattern), with each element of the input collection encoded into its own + * line. */ public static Write write() { return new AutoValue_TextIO_Write.Builder() - .setFilenameSuffix("") - .setNumShards(0) - .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE) + .setFilenamePrefix(null) + .setShardTemplate(null) + .setFilenameSuffix(null) + .setFilenamePolicy(null) .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED) .setWindowedWrites(false) + .setNumShards(0) .build(); } @@ -228,13 +225,11 @@ public class TextIO { /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write extends PTransform<PCollection<String>, PDone> { - private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; - /** The prefix of each file written, combined with suffix and shardTemplate. */ - @Nullable abstract ValueProvider<String> getFilenamePrefix(); + @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix(); /** The suffix of each file written, combined with prefix and shardTemplate. */ - abstract String getFilenameSuffix(); + @Nullable abstract String getFilenameSuffix(); /** An optional header to add to each file. */ @Nullable abstract String getHeader(); @@ -246,7 +241,7 @@ public class TextIO { abstract int getNumShards(); /** The shard template of each file written, combined with prefix and suffix. */ - abstract String getShardTemplate(); + @Nullable abstract String getShardTemplate(); /** A policy for naming output files. */ @Nullable abstract FilenamePolicy getFilenamePolicy(); @@ -264,13 +259,13 @@ public class TextIO { @AutoValue.Builder abstract static class Builder { - abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix); - abstract Builder setFilenameSuffix(String filenameSuffix); - abstract Builder setHeader(String header); - abstract Builder setFooter(String footer); + abstract Builder setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix); + abstract Builder setShardTemplate(@Nullable String shardTemplate); + abstract Builder setFilenameSuffix(@Nullable String filenameSuffix); + abstract Builder setHeader(@Nullable String header); + abstract Builder setFooter(@Nullable String footer); + abstract Builder setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy); abstract Builder setNumShards(int numShards); - abstract Builder setShardTemplate(String shardTemplate); - abstract Builder setFilenamePolicy(FilenamePolicy filenamePolicy); abstract Builder setWindowedWrites(boolean windowedWrites); abstract Builder setWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory); @@ -279,72 +274,115 @@ public class TextIO { } /** - * Writes to text files with the given prefix. This can be a local filename - * (if running locally), or a Google Cloud Storage filename of - * the form {@code "gs://<bucket>/<filepath>"} - * (if running locally or using remote execution). + * Writes to text files with the given prefix. The given {@code prefix} can reference any + * {@link FileSystem} on the classpath. + * + * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. + * + * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix + * to define the base output directory and file prefix, a shard identifier (see + * {@link #withNumShards(int)}), and a common suffix (if supplied using + * {@link #withSuffix(String)}). * - * <p>The files written will begin with this prefix, followed by - * a shard identifier (see {@link #withNumShards(int)}, and end - * in a common extension, if given by {@link #withSuffix(String)}. + * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)}, + * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should + * not be set. */ public Write to(String filenamePrefix) { - return to(StaticValueProvider.of(filenamePrefix)); + return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix)); } - /** Like {@link #to(String)}, but with a {@link ValueProvider}. */ - public Write to(ValueProvider<String> filenamePrefix) { - return toBuilder().setFilenamePrefix(filenamePrefix).build(); + /** + * Writes to text files with prefix from the given resource. + * + * <p>The name of the output files will be determined by the {@link FilenamePolicy} used. + * + * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix + * to define the base output directory and file prefix, a shard identifier (see + * {@link #withNumShards(int)}), and a common suffix (if supplied using + * {@link #withSuffix(String)}). + * + * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)}, + * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should + * not be set. + */ + public Write to(ResourceId filenamePrefix) { + return toResource(StaticValueProvider.of(filenamePrefix)); } - /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */ - public Write to(FilenamePolicy filenamePolicy) { - return toBuilder().setFilenamePolicy(filenamePolicy).build(); + /** + * Like {@link #to(String)}. + */ + public Write to(ValueProvider<String> outputPrefix) { + return toResource(NestedValueProvider.of(outputPrefix, + new SerializableFunction<String, ResourceId>() { + @Override + public ResourceId apply(String input) { + return FileBasedSink.convertToFileResourceIfPossible(input); + } + })); } /** - * Writes to the file(s) with the given filename suffix. - * - * @see ShardNameTemplate + * Like {@link #to(ResourceId)}. */ - public Write withSuffix(String nameExtension) { - return toBuilder().setFilenameSuffix(nameExtension).build(); + public Write toResource(ValueProvider<ResourceId> filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); } /** - * Uses the provided shard count. + * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be + * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. * - * <p>Constraining the number of shards is likely to reduce - * the performance of a pipeline. Setting this value is not recommended - * unless you require a specific number of output files. + * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are + * used. + */ + public Write withShardNameTemplate(String shardTemplate) { + return toBuilder().setShardTemplate(shardTemplate).build(); + } + + /** + * Configures the filename suffix for written files. This option may only be used when + * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured. * - * @param numShards the number of shards to use, or 0 to let the system - * decide. - * @see ShardNameTemplate + * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are + * used. */ - public Write withNumShards(int numShards) { - checkArgument(numShards >= 0); - return toBuilder().setNumShards(numShards).build(); + public Write withSuffix(String filenameSuffix) { + return toBuilder().setFilenameSuffix(filenameSuffix).build(); } /** - * Uses the given shard name template. + * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files. + */ + public Write withFilenamePolicy(FilenamePolicy filenamePolicy) { + return toBuilder().setFilenamePolicy(filenamePolicy).build(); + } + + /** + * Configures the number of output shards produced overall (when using unwindowed writes) or + * per-window (when using windowed writes). + * + * <p>For unwindowed writes, constraining the number of shards is likely to reduce the + * performance of a pipeline. Setting this value is not recommended unless you require a + * specific number of output files. * - * @see ShardNameTemplate + * @param numShards the number of shards to use, or 0 to let the system decide. */ - public Write withShardNameTemplate(String shardTemplate) { - return toBuilder().setShardTemplate(shardTemplate).build(); + public Write withNumShards(int numShards) { + checkArgument(numShards >= 0); + return toBuilder().setNumShards(numShards).build(); } /** - * Forces a single file as output. + * Forces a single file as output and empty shard name template. This option is only compatible + * with unwindowed writes. * - * <p>Constraining the number of shards is likely to reduce - * the performance of a pipeline. Using this setting is not recommended - * unless you truly require a single output file. + * <p>For unwindowed writes, constraining the number of shards is likely to reduce the + * performance of a pipeline. Setting this value is not recommended unless you require a + * specific number of output files. * - * <p>This is a shortcut for - * {@code .withNumShards(1).withShardNameTemplate("")} + * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")} */ public Write withoutSharding() { return withNumShards(1).withShardNameTemplate(""); @@ -386,34 +424,26 @@ public class TextIO { @Override public PDone expand(PCollection<String> input) { - if (getFilenamePolicy() == null && getFilenamePrefix() == null) { - throw new IllegalStateException( - "need to set the filename prefix of an TextIO.Write transform"); - } - if (getFilenamePolicy() != null && getFilenamePrefix() != null) { - throw new IllegalStateException( - "cannot set both a filename policy and a filename prefix"); - } - WriteFiles<String> write; - if (getFilenamePolicy() != null) { - write = - WriteFiles.to( - new TextSink( - getFilenamePolicy(), - getHeader(), - getFooter(), - getWritableByteChannelFactory())); - } else { - write = - WriteFiles.to( - new TextSink( - getFilenamePrefix(), - getFilenameSuffix(), - getHeader(), - getFooter(), - getShardTemplate(), - getWritableByteChannelFactory())); + checkState(getFilenamePrefix() != null, + "Need to set the filename prefix of a TextIO.Write transform."); + checkState( + (getFilenamePolicy() == null) + || (getShardTemplate() == null && getFilenameSuffix() == null), + "Cannot set a filename policy and also a filename template or suffix."); + + FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); + if (usedFilenamePolicy == null) { + usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters( + getFilenamePrefix(), getShardTemplate(), getFilenameSuffix()); } + WriteFiles<String> write = + WriteFiles.to( + new TextSink( + getFilenamePrefix(), + usedFilenamePolicy, + getHeader(), + getFooter(), + getWritableByteChannelFactory())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -430,16 +460,15 @@ public class TextIO { String prefixString = ""; if (getFilenamePrefix() != null) { prefixString = getFilenamePrefix().isAccessible() - ? getFilenamePrefix().get() : getFilenamePrefix().toString(); + ? getFilenamePrefix().get().toString() : getFilenamePrefix().toString(); } builder .addIfNotNull(DisplayData.item("filePrefix", prefixString) .withLabel("Output File Prefix")) - .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix()) - .withLabel("Output File Suffix"), "") - .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate()) - .withLabel("Output Shard Name Template"), - DEFAULT_SHARD_TEMPLATE) + .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix()) + .withLabel("Output File Suffix")) + .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate()) + .withLabel("Output Shard Name Template")) .addIfNotDefault(DisplayData.item("numShards", getNumShards()) .withLabel("Maximum Output Shards"), 0) .addIfNotNull(DisplayData.item("fileHeader", getHeader()) http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java index 4efdc32..0ba537e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java @@ -23,7 +23,7 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.MimeTypes; @@ -39,27 +39,15 @@ class TextSink extends FileBasedSink<String> { @Nullable private final String footer; TextSink( + ValueProvider<ResourceId> baseOutputFilename, FilenamePolicy filenamePolicy, @Nullable String header, @Nullable String footer, WritableByteChannelFactory writableByteChannelFactory) { - super(filenamePolicy, writableByteChannelFactory); + super(baseOutputFilename, filenamePolicy, writableByteChannelFactory); this.header = header; this.footer = footer; } - - TextSink( - ValueProvider<String> baseOutputFilename, - String extension, - @Nullable String header, - @Nullable String footer, - String fileNameTemplate, - WritableByteChannelFactory writableByteChannelFactory) { - super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory); - this.header = header; - this.footer = footer; - } - @Override public FileBasedWriteOperation<String> createWriteOperation() { return new TextWriteOperation(this, header, footer); @@ -77,7 +65,7 @@ class TextSink extends FileBasedSink<String> { } @Override - public FileBasedWriter<String> createWriter(PipelineOptions options) throws Exception { + public FileBasedWriter<String> createWriter() throws Exception { return new TextWriter(this, header, footer); } } http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index dcd600f..2a057e4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; import org.apache.beam.sdk.io.FileBasedSink.FileResult; +import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -254,7 +255,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { // Lazily initialize the Writer if (writer == null) { LOG.info("Opening writer for write operation {}", writeOperation); - writer = writeOperation.createWriter(c.getPipelineOptions()); + writer = writeOperation.createWriter(); if (windowedWrites) { writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM, @@ -318,7 +319,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { // In a sharded write, single input element represents one shard. We can open and close // the writer in each call to processElement. LOG.info("Opening writer for write operation {}", writeOperation); - FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions()); + FileBasedWriter<T> writer = writeOperation.createWriter(); if (windowedWrites) { writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(), numShards); @@ -474,7 +475,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { ParDo.of(new WriteShardedBundles(null))); } } - results.setCoder(writeOperation.getFileResultCoder()); + results.setCoder(FileResultCoder.of()); if (windowedWrites) { // When processing streaming windowed writes, results will arrive multiple times. This @@ -484,7 +485,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { // whenever new data arrives. PCollection<KV<Void, FileResult>> keyedResults = results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null)); - keyedResults.setCoder(KvCoder.of(VoidCoder.of(), writeOperation.getFileResultCoder())); + keyedResults.setCoder(KvCoder.of(VoidCoder.of(), FileResultCoder.of())); // Is the continuation trigger sufficient? keyedResults @@ -494,7 +495,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { public void processElement(ProcessContext c) throws Exception { LOG.info("Finalizing write operation {}.", writeOperation); List<FileResult> results = Lists.newArrayList(c.element().getValue()); - writeOperation.finalize(results, c.getPipelineOptions()); + writeOperation.finalize(results); LOG.debug("Done finalizing write operation {}", writeOperation); } })); @@ -540,7 +541,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { "Creating {} empty output shards in addition to {} written for a total of {}.", extraShardsNeeded, results.size(), minShardsNeeded); for (int i = 0; i < extraShardsNeeded; ++i) { - FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions()); + FileBasedWriter<T> writer = writeOperation.createWriter(); writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); FileResult emptyWrite = writer.close(); @@ -548,7 +549,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } LOG.debug("Done creating extra shards."); } - writeOperation.finalize(results, c.getPipelineOptions()); + writeOperation.finalize(results); LOG.debug("Done finalizing write operation {}", writeOperation); } }).withSideInputs(sideInputs.build())); http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java index 0d91bbc..33913f8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java @@ -41,7 +41,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nonnull; -import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.DefaultFilenamePolicy; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; @@ -197,7 +197,7 @@ public class IOChannelUtils { public static WritableByteChannel create(String prefix, String shardTemplate, String suffix, int numShards, String mimeType) throws IOException { if (numShards == 1) { - return create(FileBasedSink.constructName(prefix, shardTemplate, suffix, 0, 1), + return create(DefaultFilenamePolicy.constructName(prefix, shardTemplate, suffix, 0, 1), mimeType); } @@ -209,7 +209,7 @@ public class IOChannelUtils { Set<String> outputNames = new HashSet<>(); for (int i = 0; i < numShards; i++) { String outputName = - FileBasedSink.constructName(prefix, shardTemplate, suffix, i, numShards); + DefaultFilenamePolicy.constructName(prefix, shardTemplate, suffix, i, numShards); if (!outputNames.add(outputName)) { throw new IllegalArgumentException( "Shard name collision detected for: " + outputName); http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java index feee6a0..1f3f5a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.util; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; /** @@ -33,14 +34,13 @@ public class NoopPathValidator implements PathValidator { } @Override - public String validateInputFilePatternSupported(String filepattern) { - return filepattern; - } + public void validateInputFilePatternSupported(String filepattern) {} @Override - public String validateOutputFilePrefixSupported(String filePrefix) { - return filePrefix; - } + public void validateOutputFilePrefixSupported(String filePrefix) {} + + @Override + public void validateOutputResourceSupported(ResourceId resourceId) {} @Override public String verifyPath(String path) { http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java index 786cdcb..e18dd96 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java @@ -26,7 +26,6 @@ import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.hash.HashCode; @@ -38,6 +37,7 @@ import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -120,7 +120,7 @@ public class NumberedShardedFile implements ShardedFile { try { // Match inputPath which may contains glob Collection<Metadata> files = Iterables.getOnlyElement( - FileSystems.match(ImmutableList.of(filePattern))).metadata(); + FileSystems.match(Collections.singletonList(filePattern))).metadata(); LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern); http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java index a7ee16e..e69648b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util; +import org.apache.beam.sdk.io.fs.ResourceId; + /** * Interface for controlling validation of paths. */ @@ -25,17 +27,22 @@ public interface PathValidator { * Validate that a file pattern is conforming. * * @param filepattern The file pattern to verify. - * @return The post-validation filepattern. */ - String validateInputFilePatternSupported(String filepattern); + void validateInputFilePatternSupported(String filepattern); /** * Validate that an output file prefix is conforming. * * @param filePrefix the file prefix to verify. - * @return The post-validation filePrefix. */ - String validateOutputFilePrefixSupported(String filePrefix); + void validateOutputFilePrefixSupported(String filePrefix); + + /** + * Validates that an output path is conforming. + * + * @param resourceId the file prefix to verify. + */ + void validateOutputResourceSupported(ResourceId resourceId); /** * Validate that a path is a valid path and that the path http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 5991c96..1506aa9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -34,6 +35,8 @@ import com.google.common.collect.Lists; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -52,9 +55,9 @@ import org.apache.avro.reflect.ReflectDatumReader; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -278,33 +281,31 @@ public class AvroIOTest { } private static class WindowedFilenamePolicy extends FilenamePolicy { - String outputFilePrefix; + final String outputFilePrefix; WindowedFilenamePolicy(String outputFilePrefix) { this.outputFilePrefix = outputFilePrefix; } @Override - public ValueProvider<String> getBaseOutputFilenameProvider() { - return StaticValueProvider.of(outputFilePrefix); + public ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext input, String extension) { + String filename = String.format( + "%s-%s-%s-of-%s-pane-%s%s%s", + outputFilePrefix, + input.getWindow(), + input.getShardNumber(), + input.getNumShards() - 1, + input.getPaneInfo().getIndex(), + input.getPaneInfo().isLast() ? "-final" : "", + extension); + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public String windowedFilename(WindowedContext input) { - String filename = outputFilePrefix + "-" + input.getWindow().toString() + "-" - + input.getShardNumber() + "-of-" + (input.getNumShards() - 1) + "-pane-" - + input.getPaneInfo().getIndex(); - if (input.getPaneInfo().isLast()) { - filename += "-final"; - } - return filename; - } - - @Override - public String unwindowedFilename(Context input) { - String filename = outputFilePrefix + input.getShardNumber() + "-of-" - + (input.getNumShards() - 1); - return filename; + public ResourceId unwindowedFilename( + ResourceId outputDirectory, Context input, String extension) { + throw new UnsupportedOperationException("Expecting windowed outputs only"); } @Override @@ -320,8 +321,8 @@ public class AvroIOTest { @Test @Category({ValidatesRunner.class, UsesTestStream.class}) public void testWindowedAvroIOWrite() throws Throwable { - File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); - final String outputFilePrefix = baseOutputFile.getAbsolutePath(); + Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testwrite"); + String baseFilename = baseDir.resolve("prefix").toString(); Instant base = new Instant(0); ArrayList<GenericClass> allElements = new ArrayList<>(); @@ -349,7 +350,6 @@ public class AvroIOTest { secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size())))); } - TimestampedValue<GenericClass>[] firstWindowArray = firstWindowElements.toArray(new TimestampedValue[100]); TimestampedValue<GenericClass>[] secondWindowArray = @@ -364,11 +364,13 @@ public class AvroIOTest { Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length)) .advanceWatermarkToInfinity(); + FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename); windowedAvroWritePipeline .apply(values) .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1)))) .apply(AvroIO.write(GenericClass.class) - .to(new WindowedFilenamePolicy(outputFilePrefix)) + .to(baseFilename) + .withFilenamePolicy(policy) .withWindowedWrites() .withNumShards(2)); windowedAvroWritePipeline.run(); @@ -381,7 +383,7 @@ public class AvroIOTest { IntervalWindow intervalWindow = new IntervalWindow( windowStart, Duration.standardMinutes(1)); expectedFiles.add( - new File(outputFilePrefix + "-" + intervalWindow.toString() + "-" + shard + new File(baseFilename + "-" + intervalWindow.toString() + "-" + shard + "-of-1" + "-pane-0-final")); } } @@ -442,7 +444,7 @@ public class AvroIOTest { @Test @SuppressWarnings("unchecked") @Category(NeedsRunner.class) - public void testMetdata() throws Exception { + public void testMetadata() throws Exception { List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -481,7 +483,8 @@ public class AvroIOTest { p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write); p.run(); - String shardNameTemplate = write.getShardTemplate(); + String shardNameTemplate = + firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE); assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate); } @@ -494,7 +497,7 @@ public class AvroIOTest { for (int i = 0; i < numShards; i++) { expectedFiles.add( new File( - FileBasedSink.constructName( + DefaultFilenamePolicy.constructName( outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards))); } @@ -530,10 +533,10 @@ public class AvroIOTest { @Test public void testReadDisplayData() { - AvroIO.Read<String> read = AvroIO.read(String.class).from("foo.*"); + AvroIO.Read<String> read = AvroIO.read(String.class).from("/foo.*"); DisplayData displayData = DisplayData.from(read); - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); + assertThat(displayData, hasDisplayItem("filePattern", "/foo.*")); } @Test @@ -542,7 +545,7 @@ public class AvroIOTest { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); AvroIO.Read<GenericRecord> read = - AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*"); + AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*"); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("AvroIO.Read should include the file pattern in its primitive transform", http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java new file mode 100644 index 0000000..c895da8 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests of {@link DefaultFilenamePolicy}. + */ +@RunWith(JUnit4.class) +public class DefaultFilenamePolicyTest { + @Test + public void testConstructName() { + assertEquals("output-001-of-123.txt", + constructName("output", "-SSS-of-NNN", ".txt", 1, 123)); + + assertEquals("out.txt/part-00042", + constructName("out.txt", "/part-SSSSS", "", 42, 100)); + + assertEquals("out.txt", + constructName("ou", "t.t", "xt", 1, 1)); + + assertEquals("out0102shard.txt", + constructName("out", "SSNNshard", ".txt", 1, 2)); + + assertEquals("out-2/1.part-1-of-2.txt", + constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2)); + } + + @Test + public void testConstructNameWithLargeShardCount() { + assertEquals("out-100-of-5000.txt", + constructName("out", "-SS-of-NN", ".txt", 100, 5000)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 7efe47c..d9bcef4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -17,9 +17,10 @@ */ package org.apache.beam.sdk.io; -import static org.apache.beam.sdk.io.FileBasedSink.constructName; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -37,7 +38,6 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -52,9 +52,8 @@ import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; import org.junit.Rule; @@ -64,50 +63,28 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Tests for FileBasedSink. + * Tests for {@link FileBasedSink}. */ @RunWith(JUnit4.class) public class FileBasedSinkTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - private String baseOutputFilename = "output"; - private String tempDirectory = "temp"; + private final String tempDirectoryName = "temp"; - private String appendToTempFolder(String filename) { - return Paths.get(tmpFolder.getRoot().getPath(), filename).toString(); + private ResourceId getTemporaryFolder() { + return LocalResources.fromFile(tmpFolder.getRoot(), /* isDirectory */ true); } - private String getBaseOutputFilename() { - return appendToTempFolder(baseOutputFilename); + private ResourceId getBaseOutputDirectory() { + String baseOutputDirname = "output"; + return getTemporaryFolder() + .resolve(baseOutputDirname, StandardResolveOptions.RESOLVE_DIRECTORY); } - private String getBaseTempDirectory() { - return appendToTempFolder(tempDirectory); - } - - @Test - public void testConstructName() { - assertEquals("output-001-of-123.txt", - constructName("output", "-SSS-of-NNN", ".txt", 1, 123)); - - assertEquals("out.txt/part-00042", - constructName("out.txt", "/part-SSSSS", "", 42, 100)); - - assertEquals("out.txt", - constructName("ou", "t.t", "xt", 1, 1)); - - assertEquals("out0102shard.txt", - constructName("out", "SSNNshard", ".txt", 1, 2)); - - assertEquals("out-2/1.part-1-of-2.txt", - constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2)); - } - - @Test - public void testConstructNameWithLargeShardCount() { - assertEquals("out-100-of-5000.txt", - constructName("out", "-SS-of-NN", ".txt", 100, 5000)); + private ResourceId getBaseTempDirectory() { + return getTemporaryFolder() + .resolve(tempDirectoryName, StandardResolveOptions.RESOLVE_DIRECTORY); } /** @@ -117,30 +94,31 @@ public class FileBasedSinkTest { @Test public void testWriter() throws Exception { String testUid = "testId"; - String expectedFilename = IOChannelUtils.resolve(getBaseTempDirectory(), testUid); - SimpleSink.SimpleWriter writer = buildWriter(); - + ResourceId expectedFile = getBaseTempDirectory() + .resolve(testUid, StandardResolveOptions.RESOLVE_FILE); List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird"); List<String> expected = new ArrayList<>(); expected.add(SimpleSink.SimpleWriter.HEADER); expected.addAll(values); expected.add(SimpleSink.SimpleWriter.FOOTER); + SimpleSink.SimpleWriter writer = + buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter(); writer.openUnwindowed(testUid, -1, -1); for (String value : values) { writer.write(value); } FileResult result = writer.close(); - assertEquals(expectedFilename, result.getFilename()); - assertFileContains(expected, expectedFilename); + assertEquals(expectedFile, result.getFilename()); + assertFileContains(expected, expectedFile); } /** * Assert that a file contains the lines provided, in the same order as expected. */ - private void assertFileContains(List<String> expected, String filename) throws Exception { - try (BufferedReader reader = new BufferedReader(new FileReader(filename))) { + private void assertFileContains(List<String> expected, ResourceId file) throws Exception { + try (BufferedReader reader = new BufferedReader(new FileReader(file.toString()))) { List<String> actual = new ArrayList<>(); for (;;) { String line = reader.readLine(); @@ -149,7 +127,7 @@ public class FileBasedSinkTest { } actual.add(line); } - assertEquals(expected, actual); + assertEquals("contents for " + file, expected, actual); } } @@ -165,19 +143,11 @@ public class FileBasedSinkTest { } /** - * Removes temporary files when temporary and output filenames differ. + * Removes temporary files when temporary and output directories differ. */ @Test public void testRemoveWithTempFilename() throws Exception { - testRemoveTemporaryFiles(3, tempDirectory); - } - - /** - * Removes only temporary files, even if temporary and output files share the same base filename. - */ - @Test - public void testRemoveWithSameFilename() throws Exception { - testRemoveTemporaryFiles(3, baseOutputFilename); + testRemoveTemporaryFiles(3, getBaseTempDirectory()); } /** @@ -205,13 +175,13 @@ public class FileBasedSinkTest { */ @Test public void testFinalizeWithIntermediateState() throws Exception { - List<File> files = generateTemporaryFilesForFinalize(3); SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); + List<File> files = generateTemporaryFilesForFinalize(3); runFinalize(writeOp, files); - // create a temporary file - tmpFolder.newFolder(tempDirectory); - tmpFolder.newFile(tempDirectory + "/1"); + // create a temporary file and then rerun finalize + tmpFolder.newFolder(tempDirectoryName); + tmpFolder.newFile(tempDirectoryName + "/1"); runFinalize(writeOp, files); } @@ -222,9 +192,9 @@ public class FileBasedSinkTest { private List<File> generateTemporaryFilesForFinalize(int numFiles) throws Exception { List<File> temporaryFiles = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - String temporaryFilename = - FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, "" + i); - File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename); + ResourceId temporaryFile = + FileBasedWriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i); + File tmpFile = new File(tmpFolder.getRoot(), temporaryFile.toString()); tmpFile.getParentFile().mkdirs(); assertTrue(tmpFile.createNewFile()); temporaryFiles.add(tmpFile); @@ -238,26 +208,26 @@ public class FileBasedSinkTest { */ private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles) throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - int numFiles = temporaryFiles.size(); List<FileResult> fileResults = new ArrayList<>(); // Create temporary output bundles and output File objects. - for (int i = 0; i < numFiles; i++) { - fileResults.add(new FileResult(temporaryFiles.get(i).toString(), null)); + for (File f : temporaryFiles) { + ResourceId file = LocalResources.fromFile(f, false); + fileResults.add(new FileResult(file, null)); } - writeOp.finalize(fileResults, options); + writeOp.finalize(fileResults); + ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get(); for (int i = 0; i < numFiles; i++) { - String outputFilename = writeOp.getSink().getFileNamePolicy().unwindowedFilename( - new Context(i, numFiles)); - assertTrue(new File(outputFilename).exists()); + ResourceId outputFilename = writeOp.getSink().getFilenamePolicy() + .unwindowedFilename(outputDirectory, new Context(i, numFiles), ""); + assertTrue(new File(outputFilename.toString()).exists()); assertFalse(temporaryFiles.get(i).exists()); } - assertFalse(new File(writeOp.tempDirectory.get()).exists()); + assertFalse(new File(writeOp.tempDirectory.get().toString()).exists()); // Test that repeated requests of the temp directory return a stable result. assertEquals(writeOp.tempDirectory.get(), writeOp.tempDirectory.get()); } @@ -266,28 +236,43 @@ public class FileBasedSinkTest { * Create n temporary and output files and verify that removeTemporaryFiles only * removes temporary files. */ - private void testRemoveTemporaryFiles(int numFiles, String baseTemporaryFilename) + private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(baseTemporaryFilename); + String prefix = "file"; + SimpleSink sink = + new SimpleSink(getBaseOutputDirectory(), prefix, "", ""); + + FileBasedWriteOperation<String> writeOp = + new SimpleSink.SimpleWriteOperation(sink, tempDirectory); List<File> temporaryFiles = new ArrayList<>(); List<File> outputFiles = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - File tmpFile = new File(tmpFolder.getRoot(), - FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i)); + ResourceId tempResource = + FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, prefix + i); + File tmpFile = new File(tempResource.toString()); tmpFile.getParentFile().mkdirs(); - assertTrue(tmpFile.createNewFile()); + assertTrue("not able to create new temp file", tmpFile.createNewFile()); temporaryFiles.add(tmpFile); - File outputFile = tmpFolder.newFile(baseOutputFilename + i); + ResourceId outputFileId = + getBaseOutputDirectory().resolve(prefix + i, StandardResolveOptions.RESOLVE_FILE); + File outputFile = new File(outputFileId.toString()); + outputFile.getParentFile().mkdirs(); + assertTrue("not able to create new output file", outputFile.createNewFile()); outputFiles.add(outputFile); } - writeOp.removeTemporaryFiles(Collections.<String>emptySet(), true, options); + writeOp.removeTemporaryFiles(Collections.<ResourceId>emptySet(), true); for (int i = 0; i < numFiles; i++) { - assertFalse(temporaryFiles.get(i).exists()); - assertTrue(outputFiles.get(i).exists()); + File temporaryFile = temporaryFiles.get(i); + assertThat( + String.format("temp file %s exists", temporaryFile), + temporaryFile.exists(), is(false)); + File outputFile = outputFiles.get(i); + assertThat( + String.format("output file %s exists", outputFile), + outputFile.exists(), is(true)); } } @@ -296,111 +281,79 @@ public class FileBasedSinkTest { */ @Test public void testCopyToOutputFiles() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); + ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get(); List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3"); List<String> inputContents = Arrays.asList("1", "2", "3"); List<String> expectedOutputFilenames = Arrays.asList( - "output-00000-of-00003.test", "output-00001-of-00003.test", "output-00002-of-00003.test"); + "file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test"); - Map<String, String> inputFilePaths = new HashMap<>(); - List<String> expectedOutputPaths = new ArrayList<>(); + Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>(); + List<ResourceId> expectedOutputPaths = new ArrayList<>(); for (int i = 0; i < inputFilenames.size(); i++) { // Generate output paths. - File outputFile = tmpFolder.newFile(expectedOutputFilenames.get(i)); - expectedOutputPaths.add(outputFile.toString()); + expectedOutputPaths.add( + getBaseOutputDirectory() + .resolve(expectedOutputFilenames.get(i), StandardResolveOptions.RESOLVE_FILE)); // Generate and write to input paths. File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i)); - List<String> lines = Arrays.asList(inputContents.get(i)); + List<String> lines = Collections.singletonList(inputContents.get(i)); writeFile(lines, inputTmpFile); - inputFilePaths.put(inputTmpFile.toString(), - writeOp.getSink().getFileNamePolicy().unwindowedFilename( - new Context(i, inputFilenames.size()))); + inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false), + writeOp.getSink().getFilenamePolicy() + .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), "")); } // Copy input files to output files. - writeOp.copyToOutputFiles(inputFilePaths, options); + writeOp.copyToOutputFiles(inputFilePaths); // Assert that the contents were copied. for (int i = 0; i < expectedOutputPaths.size(); i++) { - assertFileContains(Arrays.asList(inputContents.get(i)), expectedOutputPaths.get(i)); + assertFileContains( + Collections.singletonList(inputContents.get(i)), expectedOutputPaths.get(i)); } } - public List<String> generateDestinationFilenames(FilenamePolicy policy, int numFiles) { - List<String> filenames = new ArrayList<>(); + public List<ResourceId> generateDestinationFilenames( + ResourceId outputDirectory, FilenamePolicy policy, int numFiles) { + List<ResourceId> filenames = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - filenames.add(policy.unwindowedFilename(new Context(i, numFiles))); + filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), "")); } return filenames; } /** - * Output filenames use the supplied naming template. - */ - @Test - public void testGenerateOutputFilenamesWithTemplate() { - List<String> expected; - List<String> actual; - SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "test", ".SS.of.NN"); - FilenamePolicy policy = sink.getFileNamePolicy(); - - expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"), - appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test")); - actual = generateDestinationFilenames(policy, 3); - assertEquals(expected, actual); - - expected = Arrays.asList(appendToTempFolder("output.00.of.01.test")); - actual = generateDestinationFilenames(policy, 1); - assertEquals(expected, actual); - - expected = new ArrayList<>(); - actual = generateDestinationFilenames(policy, 0); - assertEquals(expected, actual); - - // Also validate that we handle the case where the user specified "." that we do - // not prefix an additional "." making "..test" - sink = new SimpleSink(getBaseOutputFilename(), ".test", ".SS.of.NN"); - expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"), - appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test")); - actual = generateDestinationFilenames(policy, 3); - assertEquals(expected, actual); - - expected = Arrays.asList(appendToTempFolder("output.00.of.01.test")); - actual = generateDestinationFilenames(policy, 1); - assertEquals(expected, actual); - - expected = new ArrayList<>(); - actual = generateDestinationFilenames(policy, 0); - assertEquals(expected, actual); - } - - /** * Output filenames are generated correctly when an extension is supplied. */ @Test - public void testGenerateOutputFilenamesWithExtension() { - List<String> expected; - List<String> actual; - SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); - FilenamePolicy policy = writeOp.getSink().getFileNamePolicy(); + public void testGenerateOutputFilenames() { + List<ResourceId> expected; + List<ResourceId> actual; + ResourceId root = getBaseOutputDirectory(); + + SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test"); + FilenamePolicy policy = sink.getFilenamePolicy(); expected = Arrays.asList( - appendToTempFolder("output-00000-of-00003.test"), - appendToTempFolder("output-00001-of-00003.test"), - appendToTempFolder("output-00002-of-00003.test")); - actual = generateDestinationFilenames(policy, 3); + root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE), + root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE), + root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE) + ); + actual = generateDestinationFilenames(root, policy, 3); assertEquals(expected, actual); - expected = Arrays.asList(appendToTempFolder("output-00000-of-00001.test")); - actual = generateDestinationFilenames(policy, 1); + expected = Collections.singletonList( + root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE) + ); + actual = generateDestinationFilenames(root, policy, 1); assertEquals(expected, actual); expected = new ArrayList<>(); - actual = generateDestinationFilenames(policy, 0); + actual = generateDestinationFilenames(root, policy, 0); assertEquals(expected, actual); } @@ -408,16 +361,21 @@ public class FileBasedSinkTest { * Reject non-distinct output filenames. */ @Test - public void testCollidingOutputFilenames() { - SimpleSink sink = new SimpleSink("output", "test", "-NN"); + public void testCollidingOutputFilenames() throws IOException { + ResourceId root = getBaseOutputDirectory(); + SimpleSink sink = new SimpleSink(root, "file", "-NN", "test"); SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink); + ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE); + ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE); + ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE); + ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE); // More than one shard does. try { Iterable<FileResult> results = Lists.newArrayList( - new FileResult("temp1", "file1"), - new FileResult("temp2", "file1"), - new FileResult("temp3", "file1")); + new FileResult(temp1, output), + new FileResult(temp2, output), + new FileResult(temp3, output)); writeOp.buildOutputFilenames(results); fail("Should have failed."); @@ -432,22 +390,28 @@ public class FileBasedSinkTest { */ @Test public void testGenerateOutputFilenamesWithoutExtension() { - List<String> expected; - List<String> actual; - SimpleSink sink = new SimpleSink(appendToTempFolder(baseOutputFilename), ""); - FilenamePolicy policy = sink.getFileNamePolicy(); - - expected = Arrays.asList(appendToTempFolder("output-00000-of-00003"), - appendToTempFolder("output-00001-of-00003"), appendToTempFolder("output-00002-of-00003")); - actual = generateDestinationFilenames(policy, 3); + List<ResourceId> expected; + List<ResourceId> actual; + ResourceId root = getBaseOutputDirectory(); + SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", ""); + FilenamePolicy policy = sink.getFilenamePolicy(); + + expected = Arrays.asList( + root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE), + root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE), + root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE) + ); + actual = generateDestinationFilenames(root, policy, 3); assertEquals(expected, actual); - expected = Arrays.asList(appendToTempFolder("output-00000-of-00001")); - actual = generateDestinationFilenames(policy, 1); + expected = Collections.singletonList( + root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE) + ); + actual = generateDestinationFilenames(root, policy, 1); assertEquals(expected, actual); expected = new ArrayList<>(); - actual = generateDestinationFilenames(policy, 0); + actual = generateDestinationFilenames(root, policy, 0); assertEquals(expected, actual); } @@ -511,7 +475,7 @@ public class FileBasedSinkTest { private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory, String... values) - throws IOException, FileNotFoundException { + throws IOException { final File file = tmpFolder.newFile("test.gz"); final WritableByteChannel channel = factory.create(Channels.newChannel(new FileOutputStream(file))); @@ -529,12 +493,13 @@ public class FileBasedSinkTest { @Test public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception { final String testUid = "testId"; - SimpleSink.SimpleWriteOperation writeOp = - new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory()) + ResourceId root = getBaseOutputDirectory(); + FileBasedWriteOperation<String> writeOp = + new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()) .createWriteOperation(); - final FileBasedWriter<String> writer = - writeOp.createWriter(null); - final String expectedFilename = IOChannelUtils.resolve(writeOp.tempDirectory.get(), testUid); + final FileBasedWriter<String> writer = writeOp.createWriter(); + final ResourceId expectedFile = + writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE); final List<String> expected = new ArrayList<>(); expected.add("header"); @@ -551,38 +516,29 @@ public class FileBasedSinkTest { writer.write("b"); final FileResult result = writer.close(); - assertEquals(expectedFilename, result.getFilename()); - assertFileContains(expected, expectedFilename); + assertEquals(expectedFile, result.getFilename()); + assertFileContains(expected, expectedFile); } /** * Build a SimpleSink with default options. */ private SimpleSink buildSink() { - return new SimpleSink(getBaseOutputFilename(), "test"); + return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test"); } /** - * Build a SimpleWriteOperation with default options and the given base temporary filename. + * Build a SimpleWriteOperation with default options and the given temporary directory. */ - private SimpleSink.SimpleWriteOperation buildWriteOperation(String baseTemporaryFilename) { + private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) { SimpleSink sink = buildSink(); - return new SimpleSink.SimpleWriteOperation(sink, appendToTempFolder(baseTemporaryFilename)); + return new SimpleSink.SimpleWriteOperation(sink, tempDirectory); } /** * Build a write operation with the default options for it and its parent sink. */ private SimpleSink.SimpleWriteOperation buildWriteOperation() { - SimpleSink sink = buildSink(); - return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory()); - } - - /** - * Build a writer with the default options for its parent write operation and sink. - */ - private SimpleSink.SimpleWriter buildWriter() { - SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); - return new SimpleSink.SimpleWriter(writeOp); + return buildSink().createWriteOperation(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java index f83642a..9265520 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -19,24 +19,25 @@ package org.apache.beam.sdk.io; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; -import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.util.MimeTypes; /** - * A simple FileBasedSink that writes String values as lines with header and footer lines. + * A simple {@link FileBasedSink} that writes {@link String} values as lines with + * header and footer. */ class SimpleSink extends FileBasedSink<String> { - public SimpleSink(String baseOutputFilename, String extension) { - super(baseOutputFilename, extension); + public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) { + this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED); } - public SimpleSink(String baseOutputFilename, String extension, + public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix, WritableByteChannelFactory writableByteChannelFactory) { - super(baseOutputFilename, extension, writableByteChannelFactory); - } - - public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) { - super(baseOutputFilename, extension, fileNamingTemplate); + super( + StaticValueProvider.of(baseOutputDirectory), + new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix), + writableByteChannelFactory); } @Override @@ -45,8 +46,8 @@ class SimpleSink extends FileBasedSink<String> { } static final class SimpleWriteOperation extends FileBasedWriteOperation<String> { - public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) { - super(sink, tempOutputFilename); + public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) { + super(sink, tempOutputDirectory); } public SimpleWriteOperation(SimpleSink sink) { @@ -54,7 +55,7 @@ class SimpleSink extends FileBasedSink<String> { } @Override - public SimpleWriter createWriter(PipelineOptions options) throws Exception { + public SimpleWriter createWriter() throws Exception { return new SimpleWriter(this); } } http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 66b605f..685da82 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY; import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; @@ -28,7 +29,6 @@ import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED; import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; -import static org.apache.beam.sdk.util.IOChannelUtils.resolve; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -62,6 +62,7 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.zip.GZIPOutputStream; @@ -73,6 +74,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.TextIO.CompressionType; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -80,19 +83,16 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -101,7 +101,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Tests for TextIO Read and Write transforms. + * Tests for {@link TextIO} {@link TextIO.Read} and {@link TextIO.Write} transforms. */ // TODO: Change the tests to use ValidatesRunner instead of NeedsRunner @RunWith(JUnit4.class) @@ -168,7 +168,6 @@ public class TextIOTest { @BeforeClass public static void setupClass() throws IOException { - IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions()); tempFolder = Files.createTempDirectory("TextIOTest"); // empty files emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED); @@ -314,7 +313,7 @@ public class TextIOTest { p.run(); assertOutputFiles(elems, header, footer, numShards, baseDir, outputName, - write.getShardTemplate()); + firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE)); } public static void assertOutputFiles( @@ -328,17 +327,18 @@ public class TextIOTest { throws Exception { List<File> expectedFiles = new ArrayList<>(); if (numShards == 0) { - String pattern = - resolve(rootLocation.toAbsolutePath().toString(), outputName + "*"); - for (String expected : IOChannelUtils.getFactory(pattern).match(pattern)) { - expectedFiles.add(new File(expected)); + String pattern = rootLocation.toAbsolutePath().resolve(outputName + "*").toString(); + List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern)); + for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) { + expectedFiles.add(new File(expectedFile.resourceId().toString())); } } else { for (int i = 0; i < numShards; i++) { expectedFiles.add( new File( rootLocation.toString(), - FileBasedSink.constructName(outputName, shardNameTemplate, "", i, numShards))); + DefaultFilenamePolicy.constructName( + outputName, shardNameTemplate, "", i, numShards))); } } @@ -483,7 +483,7 @@ public class TextIOTest { @Test public void testWriteDisplayData() { TextIO.Write write = TextIO.write() - .to("foo") + .to("/foo") .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") .withNumShards(100) @@ -492,7 +492,7 @@ public class TextIOTest { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("filePrefix", "foo")); + assertThat(displayData, hasDisplayItem("filePrefix", "/foo")); assertThat(displayData, hasDisplayItem("fileSuffix", "bar")); assertThat(displayData, hasDisplayItem("fileHeader", "myHeader")); assertThat(displayData, hasDisplayItem("fileFooter", "myFooter")); @@ -523,23 +523,6 @@ public class TextIOTest { assertThat(displayData, hasDisplayItem("fileFooter", "myFooter")); } - @Test - @Category(ValidatesRunner.class) - @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient") - public void testPrimitiveWriteDisplayData() throws IOException { - PipelineOptions options = DisplayDataEvaluator.getDefaultOptions(); - String tempRoot = options.as(TestPipelineOptions.class).getTempRoot(); - String outputPath = IOChannelUtils.getFactory(tempRoot).resolve(tempRoot, "foobar"); - - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - - TextIO.Write write = TextIO.write().to(outputPath); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("TextIO.Write should include the file prefix in its primitive display data", - displayData, hasItem(hasDisplayItem(hasValue(startsWith(outputPath))))); - } - /** Options for testing. */ public interface RuntimeTestOptions extends PipelineOptions { ValueProvider<String> getInput();