Repository: beam Updated Branches: refs/heads/master b4bafd092 -> 1bc50d627
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index ea0395d..a5dacd1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -29,7 +29,6 @@ import static org.junit.Assert.assertThat; import com.google.common.base.Optional; import com.google.common.collect.Lists; - import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -39,12 +38,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; - import java.util.concurrent.ThreadLocalRandom; - import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.SimpleSink.SimpleWriter; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -149,7 +148,8 @@ public class WriteFilesTest { } private String getBaseOutputFilename() { - return appendToTempFolder("baseoutput"); + return getBaseOutputDirectory() + .resolve("file", StandardResolveOptions.RESOLVE_FILE).toString(); } /** @@ -188,6 +188,15 @@ public class WriteFilesTest { Optional.of(1)); } + private ResourceId getBaseOutputDirectory() { + return LocalResources.fromFile(tmpFolder.getRoot(), true) + .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY); + + } + private SimpleSink makeSimpleSink() { + return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple"); + } + @Test @Category(NeedsRunner.class) public void testCustomShardedWrite() throws IOException { @@ -204,7 +213,7 @@ public class WriteFilesTest { timestamps.add(i + 1); } - SimpleSink sink = new SimpleSink(getBaseOutputFilename(), ""); + SimpleSink sink = makeSimpleSink(); WriteFiles<String> write = WriteFiles.to(sink).withSharding(new LargestInt()); p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) .apply(IDENTITY_MAP) @@ -270,7 +279,7 @@ public class WriteFilesTest { @Test public void testBuildWrite() { - SimpleSink sink = new SimpleSink(getBaseOutputFilename(), ""); + SimpleSink sink = makeSimpleSink(); WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3); assertThat((SimpleSink) write.getSink(), is(sink)); PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding = @@ -293,7 +302,7 @@ public class WriteFilesTest { @Test public void testDisplayData() { - SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") { + SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("foo", "bar")); @@ -308,7 +317,7 @@ public class WriteFilesTest { @Test public void testShardedDisplayData() { - SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") { + SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("foo", "bar")); @@ -323,7 +332,7 @@ public class WriteFilesTest { @Test public void testCustomShardStrategyDisplayData() { - SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") { + SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") { @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("foo", "bar")); @@ -354,7 +363,7 @@ public class WriteFilesTest { * methods on a test sink in the correct order, as well as verifies that the elements of a * PCollection are written to the sink. */ - private static void runWrite( + private void runWrite( List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, String baseName) throws IOException { runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent()); @@ -366,7 +375,7 @@ public class WriteFilesTest { * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards * is not null, also verifies that the output number of shards is correct. */ - private static void runShardedWrite( + private void runShardedWrite( List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, String baseName, @@ -382,7 +391,7 @@ public class WriteFilesTest { timestamps.add(i + 1); } - SimpleSink sink = new SimpleSink(baseName, ""); + SimpleSink sink = makeSimpleSink(); WriteFiles<String> write = WriteFiles.to(sink); if (numConfiguredShards.isPresent()) { write = write.withNumShards(numConfiguredShards.get()); @@ -399,8 +408,10 @@ public class WriteFilesTest { Optional<Integer> numExpectedShards) throws IOException { List<File> outputFiles = Lists.newArrayList(); final String pattern = baseName + "*"; - for (String outputFileName : IOChannelUtils.getFactory(pattern).match(pattern)) { - outputFiles.add(new File(outputFileName)); + List<Metadata> metadata = + FileSystems.match(Collections.singletonList(pattern)).get(0).metadata(); + for (Metadata meta : metadata) { + outputFiles.add(new File(meta.resourceId().toString())); } if (numExpectedShards.isPresent()) { assertEquals(numExpectedShards.get().intValue(), outputFiles.size()); http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java index 4d58424..9ad4152 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.gcsfs.GcsPath; @@ -44,12 +45,11 @@ public class GcsPathValidator implements PathValidator { * is well formed. */ @Override - public String validateInputFilePatternSupported(String filepattern) { + public void validateInputFilePatternSupported(String filepattern) { GcsPath gcsPath = getGcsPath(filepattern); - checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject())); - String returnValue = verifyPath(filepattern); + checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject())); + verifyPath(filepattern); verifyPathIsAccessible(filepattern, "Could not find file %s"); - return returnValue; } /** @@ -57,10 +57,18 @@ public class GcsPathValidator implements PathValidator { * is well formed. */ @Override - public String validateOutputFilePrefixSupported(String filePrefix) { - String returnValue = verifyPath(filePrefix); + public void validateOutputFilePrefixSupported(String filePrefix) { + verifyPath(filePrefix); verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s"); - return returnValue; + } + + @Override + public void validateOutputResourceSupported(ResourceId resourceId) { + checkArgument( + resourceId.getScheme().equals("gs"), + "Expected a valid 'gs://' path but was given: '%s'", + resourceId); + verifyPath(resourceId.toString()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index c41d6bc..a34fb0f 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -21,19 +21,19 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; - import java.nio.charset.Charset; - import javax.annotation.Nullable; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.ValidationEventHandler; - import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CompressedSource; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -450,7 +450,7 @@ public class XmlIO { @AutoValue public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { @Nullable - abstract String getFilenamePrefix(); + abstract ValueProvider<ResourceId> getFilenamePrefix(); @Nullable abstract Class<T> getRecordClass(); @@ -465,7 +465,7 @@ public class XmlIO { @AutoValue.Builder abstract static class Builder<T> { - abstract Builder<T> setFilenamePrefix(String baseOutputFilename); + abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> prefix); abstract Builder<T> setRecordClass(Class<T> recordClass); @@ -482,8 +482,9 @@ public class XmlIO { * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is * the number of output bundles. */ - public Write<T> toFilenamePrefix(String filenamePrefix) { - return toBuilder().setFilenamePrefix(filenamePrefix).build(); + public Write<T> to(String filenamePrefix) { + ResourceId resourceId = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix); + return toBuilder().setFilenamePrefix(StaticValueProvider.of(resourceId)).build(); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java index 6f87d75..963ab1b 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -24,7 +24,10 @@ import java.nio.channels.WritableByteChannel; import javax.xml.bind.JAXBContext; import javax.xml.bind.Marshaller; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.DefaultFilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.ShardNameTemplate; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; @@ -32,12 +35,17 @@ import org.apache.beam.sdk.util.MimeTypes; /** Implementation of {@link XmlIO#write}. */ class XmlSink<T> extends FileBasedSink<T> { - protected static final String XML_EXTENSION = "xml"; + private static final String XML_EXTENSION = ".xml"; private final XmlIO.Write<T> spec; + private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) { + return DefaultFilenamePolicy.constructUsingStandardParameters( + spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION); + } + XmlSink(XmlIO.Write<T> spec) { - super(spec.getFilenamePrefix(), XML_EXTENSION); + super(spec.getFilenamePrefix(), makeFilenamePolicy(spec)); this.spec = spec; } @@ -79,7 +87,7 @@ class XmlSink<T> extends FileBasedSink<T> { * Creates a {@link XmlWriter} with a marshaller for the type it will write. */ @Override - public XmlWriter<T> createWriter(PipelineOptions options) throws Exception { + public XmlWriter<T> createWriter() throws Exception { JAXBContext context; Marshaller marshaller; context = JAXBContext.newInstance(getSink().spec.getRecordClass()); @@ -99,7 +107,7 @@ class XmlSink<T> extends FileBasedSink<T> { } @VisibleForTesting - String getTemporaryDirectory() { + ResourceId getTemporaryDirectory() { return this.tempDirectory.get(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java index 7f9a8c5..aa0c1c3 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.xml; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -62,7 +63,7 @@ public class XmlSinkTest { public ExpectedException thrown = ExpectedException.none(); private String testRootElement = "testElement"; - private String testFilePrefix = "/path/to/testPrefix"; + private String testFilePrefix = "/path/to/file"; /** * An XmlWriter correctly writes objects as Xml elements with an enclosing root element. @@ -72,12 +73,12 @@ public class XmlSinkTest { PipelineOptions options = PipelineOptionsFactory.create(); XmlWriteOperation<Bird> writeOp = XmlIO.<Bird>write() - .toFilenamePrefix(testFilePrefix) + .to(testFilePrefix) .withRecordClass(Bird.class) .withRootElement("birds") .createSink() .createWriteOperation(); - XmlWriter<Bird> writer = writeOp.createWriter(options); + XmlWriter<Bird> writer = writeOp.createWriter(); List<Bird> bundle = Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", "goose")); @@ -89,16 +90,15 @@ public class XmlSinkTest { @Test public void testXmlWriterCharset() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); XmlWriteOperation<Bird> writeOp = XmlIO.<Bird>write() - .toFilenamePrefix(testFilePrefix) + .to(testFilePrefix) .withRecordClass(Bird.class) .withRootElement("birds") .withCharset(StandardCharsets.ISO_8859_1) .createSink() .createWriteOperation(); - XmlWriter<Bird> writer = writeOp.createWriter(options); + XmlWriter<Bird> writer = writeOp.createWriter(); List<Bird> bundle = Lists.newArrayList(new Bird("bréche", "pinçon")); List<String> lines = Arrays.asList("<birds>", "<bird>", "<species>pinçon</species>", @@ -113,12 +113,15 @@ public class XmlSinkTest { public void testBuildXmlWriteTransform() { XmlIO.Write<Bird> write = XmlIO.<Bird>write() - .toFilenamePrefix(testFilePrefix) + .to(testFilePrefix) .withRecordClass(Bird.class) .withRootElement(testRootElement); assertEquals(Bird.class, write.getRecordClass()); assertEquals(testRootElement, write.getRootElement()); - assertEquals(testFilePrefix, write.getFilenamePrefix()); + assertNotNull(write.getFilenamePrefix()); + assertThat( + write.getFilenamePrefix().toString(), + containsString(testFilePrefix)); } /** Validation ensures no fields are missing. */ @@ -126,19 +129,21 @@ public class XmlSinkTest { public void testValidateXmlSinkMissingRecordClass() { thrown.expect(NullPointerException.class); XmlIO.<Bird>write() + .to(testFilePrefix) .withRootElement(testRootElement) - .toFilenamePrefix(testFilePrefix) .validate(null); } @Test public void testValidateXmlSinkMissingRootElement() { thrown.expect(NullPointerException.class); - XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null); + XmlIO.<Bird>write().withRecordClass(Bird.class) + .to(testFilePrefix) + .validate(null); } @Test - public void testValidateXmlSinkMissingFilePrefix() { + public void testValidateXmlSinkMissingOutputDirectory() { thrown.expect(NullPointerException.class); XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null); } @@ -151,16 +156,15 @@ public class XmlSinkTest { PipelineOptions options = PipelineOptionsFactory.create(); XmlSink<Bird> sink = XmlIO.<Bird>write() + .to(testFilePrefix) .withRecordClass(Bird.class) .withRootElement(testRootElement) - .toFilenamePrefix(testFilePrefix) .createSink(); XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(); Path outputPath = new File(testFilePrefix).toPath(); - Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath(); - assertEquals(outputPath.getParent(), tempPath.getParent()); - assertThat( - tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); + Path tempPath = new File(writeOp.getTemporaryDirectory().toString()).toPath(); + assertThat(tempPath.getParent(), equalTo(outputPath.getParent())); + assertThat(tempPath.getFileName().toString(), containsString("temp-beam-")); } /** @@ -173,28 +177,28 @@ public class XmlSinkTest { XmlIO.<Bird>write() .withRecordClass(Bird.class) .withRootElement(testRootElement) - .toFilenamePrefix(testFilePrefix) + .to(testFilePrefix) .createSink() .createWriteOperation(); - XmlWriter<Bird> writer = writeOp.createWriter(options); + XmlWriter<Bird> writer = writeOp.createWriter(); Path outputPath = new File(testFilePrefix).toPath(); - Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath(); - assertEquals(outputPath.getParent(), tempPath.getParent()); - assertThat( - tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); + Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory().toString()) + .toPath(); + assertThat(tempPath.getParent(), equalTo(outputPath.getParent())); + assertThat(tempPath.getFileName().toString(), containsString("temp-beam-")); assertNotNull(writer.marshaller); } @Test public void testDisplayData() { XmlIO.Write<Integer> write = XmlIO.<Integer>write() - .toFilenamePrefix("foobar") + .to(testFilePrefix) .withRootElement("bird") .withRecordClass(Integer.class); DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml")); + assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml")); assertThat(displayData, hasDisplayItem("rootElement", "bird")); assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); }