Repository: incubator-beam Updated Branches: refs/heads/master 26eb4354c -> c84045573
Add TextIO.Write support for runtime-valued output prefix * Updates to TextIO * Updates for FileBasedSink to support this change * Updates to other FileBasedSinks that do not yet support runtime values but need to be aware that values are now ValueProvider<String> instead of String Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a038c4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a038c4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a038c4f Branch: refs/heads/master Commit: 9a038c4f3404a3707eca29c5e898014df7fafbf4 Parents: 26eb435 Author: Sam McVeety <s...@google.com> Authored: Wed Nov 30 14:06:59 2016 -0800 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri Dec 2 17:24:12 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSink.java | 22 +++++++++------ .../java/org/apache/beam/sdk/io/TextIO.java | 28 ++++++++++++++++---- .../java/org/apache/beam/sdk/io/XmlSink.java | 4 +-- .../org/apache/beam/sdk/io/XmlSinkTest.java | 6 ++--- 4 files changed, 42 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 5375b90..1396ab6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -41,6 +41,8 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -135,7 +137,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { /** * Base filename for final output files. */ - protected final String baseOutputFilename; + protected final ValueProvider<String> baseOutputFilename; /** * The extension to be used for the final output files. @@ -162,7 +164,8 @@ public abstract class FileBasedSink<T> extends Sink<T> { */ public FileBasedSink(String baseOutputFilename, String extension, WritableByteChannelFactory writableByteChannelFactory) { - this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory); + this(StaticValueProvider.of(baseOutputFilename), extension, + ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory); } /** @@ -173,7 +176,8 @@ public abstract class FileBasedSink<T> extends Sink<T> { * <p>See {@link ShardNameTemplate} for a description of file naming templates. */ public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) { - this(baseOutputFilename, extension, fileNamingTemplate, CompressionType.UNCOMPRESSED); + this(StaticValueProvider.of(baseOutputFilename), extension, fileNamingTemplate, + CompressionType.UNCOMPRESSED); } /** @@ -182,8 +186,8 @@ public abstract class FileBasedSink<T> extends Sink<T> { * * <p>See {@link ShardNameTemplate} for a description of file naming templates. */ - public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate, - WritableByteChannelFactory writableByteChannelFactory) { + public FileBasedSink(ValueProvider<String> baseOutputFilename, String extension, + String fileNamingTemplate, WritableByteChannelFactory writableByteChannelFactory) { this.writableByteChannelFactory = writableByteChannelFactory; this.baseOutputFilename = baseOutputFilename; if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) { @@ -198,7 +202,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { * Returns the base output filename for this file based sink. */ public String getBaseOutputFilename() { - return baseOutputFilename; + return baseOutputFilename.get(); } @Override @@ -216,7 +220,9 @@ public abstract class FileBasedSink<T> extends Sink<T> { super.populateDisplayData(builder); String fileNamePattern = String.format("%s%s%s", - baseOutputFilename, fileNamingTemplate, getFileExtension(extension)); + baseOutputFilename.isAccessible() + ? baseOutputFilename.get() : baseOutputFilename.toString(), + fileNamingTemplate, getFileExtension(extension)); builder.add(DisplayData.item("fileNamePattern", fileNamePattern) .withLabel("File Name Pattern")); } @@ -420,7 +426,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { protected final List<String> generateDestinationFilenames(int numFiles) { List<String> destFilenames = new ArrayList<>(); String extension = getSink().extension; - String baseOutputFilename = getSink().baseOutputFilename; + String baseOutputFilename = getSink().baseOutputFilename.get(); String fileNamingTemplate = getSink().fileNamingTemplate; String suffix = getFileExtension(extension); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 84c24ea..e967a27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -404,6 +404,13 @@ public class TextIO { } /** + * Like {@link #to(String)}, but with a {@link ValueProvider}. + */ + public static Bound<String> to(ValueProvider<String> prefix) { + return new Bound<>(DEFAULT_TEXT_CODER).to(prefix); + } + + /** * Returns a transform for writing to text files that appends the specified suffix * to the created files. */ @@ -521,7 +528,7 @@ public class TextIO { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The prefix of each file written, combined with suffix and shardTemplate. */ - @Nullable private final String filenamePrefix; + private final ValueProvider<String> filenamePrefix; /** The suffix of each file written, combined with prefix and shardTemplate. */ private final String filenameSuffix; @@ -554,7 +561,7 @@ public class TextIO { FileBasedSink.CompressionType.UNCOMPRESSED); } - private Bound(String name, String filenamePrefix, String filenameSuffix, + private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix, @Nullable String header, @Nullable String footer, Coder<T> coder, int numShards, String shardTemplate, boolean validate, WritableByteChannelFactory writableByteChannelFactory) { @@ -581,6 +588,15 @@ public class TextIO { */ public Bound<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); + return new Bound<>(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, + header, footer, coder, numShards, shardTemplate, validate, + writableByteChannelFactory); + } + + /** + * Like {@link #to(String)}, but with a {@link ValueProvider}. + */ + public Bound<T> to(ValueProvider<String> filenamePrefix) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -745,8 +761,10 @@ public class TextIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + String prefixString = filenamePrefix.isAccessible() + ? filenamePrefix.get() : filenamePrefix.toString(); builder - .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix) + .addIfNotNull(DisplayData.item("filePrefix", prefixString) .withLabel("Output File Prefix")) .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix) .withLabel("Output Fix Suffix"), "") @@ -779,7 +797,7 @@ public class TextIO { } public String getFilenamePrefix() { - return filenamePrefix; + return filenamePrefix.get(); } public String getShardTemplate() { @@ -1101,7 +1119,7 @@ public class TextIO { @VisibleForTesting TextSink( - String baseOutputFilename, String extension, + ValueProvider<String> baseOutputFilename, String extension, @Nullable String header, @Nullable String footer, String fileNameTemplate, Coder<T> coder, WritableByteChannelFactory writableByteChannelFactory) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java index 983eed2..0f25aea 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java @@ -176,7 +176,7 @@ public class XmlSink { * <p>The specified class must be able to be used to create a JAXB context. */ public <T> Bound<T> ofRecordClass(Class<T> classToBind) { - return new Bound<>(classToBind, rootElementName, baseOutputFilename); + return new Bound<>(classToBind, rootElementName, baseOutputFilename.get()); } /** @@ -194,7 +194,7 @@ public class XmlSink { * supplied name. */ public Bound<T> withRootElement(String rootElementName) { - return new Bound<>(classToBind, rootElementName, baseOutputFilename); + return new Bound<>(classToBind, rootElementName, baseOutputFilename.get()); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java index 400b04a..f9a9655 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java @@ -93,7 +93,7 @@ public class XmlSinkTest { .withRootElement(testRootElement); assertEquals(testClass, sink.classToBind); assertEquals(testRootElement, sink.rootElementName); - assertEquals(testFilePrefix, sink.baseOutputFilename); + assertEquals(testFilePrefix, sink.baseOutputFilename.get()); } /** @@ -105,7 +105,7 @@ public class XmlSinkTest { XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix); assertEquals(testClass, sink.classToBind); assertEquals(testRootElement, sink.rootElementName); - assertEquals(testFilePrefix, sink.baseOutputFilename); + assertEquals(testFilePrefix, sink.baseOutputFilename.get()); } /** @@ -142,7 +142,7 @@ public class XmlSinkTest { XmlSink.writeOf(testClass, testRootElement, testFilePrefix); XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options); assertEquals(testClass, writeOp.getSink().classToBind); - assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename); + assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename.get()); assertEquals(testRootElement, writeOp.getSink().rootElementName); assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension); Path outputPath = new File(testFilePrefix).toPath();