Makes FileBasedSink use a temporary directory When writing to /path/to/foo, temporary files would be written to /path/too/foo-temp-$uid (or something like that), i.e. as siblings of the final output. That could lead to issues like http://stackoverflow.com/q/39822859/278042
Now, temporary files are written to a path like: /path/too/temp-beam-foo-$date/$uid. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8c1008c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8c1008c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8c1008c4 Branch: refs/heads/apex-runner Commit: 8c1008c42615dd7704fc2df02b140af1c8332c72 Parents: c6c41ea Author: Eugene Kirpichov <kirpic...@google.com> Authored: Tue Oct 4 15:23:27 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Thu Nov 3 12:39:12 2016 -0700 ---------------------------------------------------------------------- .../examples/MinimalWordCountJava8Test.java | 2 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 34 +++++++++++++++----- .../beam/sdk/util/FileIOChannelFactory.java | 8 +++-- .../beam/sdk/util/GcsIOChannelFactory.java | 8 ++++- .../apache/beam/sdk/util/IOChannelFactory.java | 4 +++ .../org/apache/beam/sdk/util/gcsfs/GcsPath.java | 13 ++++++-- .../apache/beam/sdk/io/FileBasedSinkTest.java | 23 +++++++------ .../org/apache/beam/sdk/io/XmlSinkTest.java | 16 +++++++-- .../apache/beam/sdk/util/gcsfs/GcsPathTest.java | 25 ++++++++++++++ 9 files changed, 107 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index 1819219..f373343 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -68,7 +68,7 @@ public class MinimalWordCountJava8Test implements Serializable { .apply(MapElements .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) .withOutputType(TypeDescriptors.strings())) - .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); + .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix")); } private GcsUtil buildMockGcsUtil() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 4355962..3d0fe04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -31,6 +31,7 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.ArrayList; @@ -53,6 +54,8 @@ import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -317,17 +320,16 @@ public abstract class FileBasedSink<T> extends Sink<T> { protected final String baseTemporaryFilename; /** - * Name separator for temporary files. Temporary files will be named - * {@code {baseTemporaryFilename}-temp-{bundleId}}. - */ - protected static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-"; - - /** * Build a temporary filename using the temporary filename separator with the given prefix and * suffix. */ protected static final String buildTemporaryFilename(String prefix, String suffix) { - return prefix + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + suffix; + try { + IOChannelFactory factory = IOChannelUtils.getFactory(prefix); + return factory.resolve(prefix, suffix); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** @@ -337,7 +339,23 @@ public abstract class FileBasedSink<T> extends Sink<T> { * @param sink the FileBasedSink that will be used to configure this write operation. */ public FileBasedWriteOperation(FileBasedSink<T> sink) { - this(sink, sink.baseOutputFilename); + this(sink, buildTemporaryDirectoryName(sink.getBaseOutputFilename())); + } + + private static String buildTemporaryDirectoryName(String baseOutputFilename) { + try { + IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename); + Path baseOutputPath = factory.toPath(baseOutputFilename); + return baseOutputPath + .resolveSibling( + "temp-beam-" + + baseOutputPath.getFileName() + + "-" + + Instant.now().toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss"))) + .toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index a11231b..2d2c0c6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -148,7 +148,11 @@ public class FileIOChannelFactory implements IOChannelFactory { @Override public String resolve(String path, String other) throws IOException { - Path p = specToFile(path).toPath(); - return p.resolve(other).toString(); + return toPath(path).resolve(other).toString(); + } + + @Override + public Path toPath(String path) { + return specToFile(path).toPath(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java index 14090e3..652e468 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.util; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -81,6 +82,11 @@ public class GcsIOChannelFactory implements IOChannelFactory { @Override public String resolve(String path, String other) throws IOException { - return GcsPath.fromUri(path).resolve(other).toString(); + return toPath(path).resolve(other).toString(); + } + + @Override + public Path toPath(String path) { + return GcsPath.fromUri(path); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java index ae6c507..4e55036 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; import java.util.Collection; /** @@ -99,4 +100,7 @@ public interface IOChannelFactory { * dependent and therefore unspecified. */ String resolve(String path, String other) throws IOException; + + /** Converts the given string to a {@link Path}. */ + Path toPath(String path); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java index bfcd6da..863b01b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java @@ -261,7 +261,12 @@ public class GcsPath implements Path { @Override public GcsPath getFileName() { - throw new UnsupportedOperationException(); + int nameCount = getNameCount(); + if (nameCount < 2) { + throw new UnsupportedOperationException( + "Can't get filename from root path in the bucket: " + this); + } + return getName(nameCount - 1); } /** @@ -436,7 +441,11 @@ public class GcsPath implements Path { @Override public Path resolveSibling(String other) { - throw new UnsupportedOperationException(); + if (getNameCount() < 2) { + throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this); + } + GcsPath parent = getParent(); + return (parent == null) ? fromUri(other) : parent.resolve(other); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 66bb661..8301afc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -89,7 +89,7 @@ public class FileBasedSinkTest { public void testWriter() throws Exception { String testUid = "testId"; String expectedFilename = - getBaseTempFilename() + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + testUid; + getBaseTempFilename() + "/" + testUid; SimpleSink.SimpleWriter writer = buildWriter(); List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird"); @@ -193,8 +193,7 @@ public class FileBasedSinkTest { runFinalize(writeOp, files, false); // create a temporary file - tmpFolder.newFile( - baseTemporaryFilename + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + "1"); + tmpFolder.newFile(baseTemporaryFilename + "/1"); runFinalize(writeOp, files, false); } @@ -217,7 +216,9 @@ public class FileBasedSinkTest { for (int i = 0; i < numFiles; i++) { String temporaryFilename = FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i); - File tmpFile = tmpFolder.newFile(temporaryFilename); + File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename); + tmpFile.getParentFile().mkdirs(); + assertTrue(tmpFile.createNewFile()); temporaryFiles.add(tmpFile); } @@ -263,8 +264,10 @@ public class FileBasedSinkTest { List<File> temporaryFiles = new ArrayList<>(); List<File> outputFiles = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - File tmpFile = tmpFolder.newFile( + File tmpFile = new File(tmpFolder.getRoot(), FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i)); + tmpFile.getParentFile().mkdirs(); + assertTrue(tmpFile.createNewFile()); temporaryFiles.add(tmpFile); File outputFile = tmpFolder.newFile(baseOutputFilename + i); outputFiles.add(outputFile); @@ -496,11 +499,13 @@ public class FileBasedSinkTest { @Test public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception { final String testUid = "testId"; - final String expectedFilename = - getBaseOutputFilename() + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + testUid; - final FileBasedWriter<String> writer = + SimpleSink.SimpleWriteOperation writeOp = new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory()) - .createWriteOperation(null).createWriter(null); + .createWriteOperation(null); + final FileBasedWriter<String> writer = + writeOp.createWriter(null); + final String expectedFilename = + writeOp.baseTemporaryFilename + "/" + testUid; final List<String> expected = new ArrayList<>(); expected.add("header"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java index 2788ea6..653a9d0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -28,6 +29,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -59,7 +61,7 @@ public class XmlSinkTest { private Class<Bird> testClass = Bird.class; private String testRootElement = "testElement"; - private String testFilePrefix = "testPrefix"; + private String testFilePrefix = "/path/to/testPrefix"; /** * An XmlWriter correctly writes objects as Xml elements with an enclosing root element. @@ -143,7 +145,11 @@ public class XmlSinkTest { assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename); assertEquals(testRootElement, writeOp.getSink().rootElementName); assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension); - assertEquals(testFilePrefix, writeOp.baseTemporaryFilename); + Path outputPath = new File(testFilePrefix).toPath(); + Path tempPath = new File(writeOp.baseTemporaryFilename).toPath(); + assertEquals(outputPath.getParent(), tempPath.getParent()); + assertThat( + tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); } /** @@ -156,7 +162,11 @@ public class XmlSinkTest { XmlSink.writeOf(testClass, testRootElement, testFilePrefix) .createWriteOperation(options); XmlWriter<Bird> writer = writeOp.createWriter(options); - assertEquals(testFilePrefix, writer.getWriteOperation().baseTemporaryFilename); + Path outputPath = new File(testFilePrefix).toPath(); + Path tempPath = new File(writer.getWriteOperation().baseTemporaryFilename).toPath(); + assertEquals(outputPath.getParent(), tempPath.getParent()); + assertThat( + tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); assertEquals(testRootElement, writer.getWriteOperation().getSink().rootElementName); assertNotNull(writer.marshaller); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java index 5c86184..426fb16 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java @@ -31,7 +31,9 @@ import java.util.Iterator; import java.util.List; import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -73,6 +75,9 @@ public class GcsPathTest { new TestCase("gs://bucket/", "bucket") ); + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test public void testGcsPathParsing() throws Exception { for (TestCase testCase : PATH_TEST_CASES) { @@ -237,6 +242,26 @@ public class GcsPathTest { } @Test + public void testGetFileName() { + assertEquals("foo", GcsPath.fromUri("gs://bucket/bar/foo").getFileName().toString()); + assertEquals("foo", GcsPath.fromUri("gs://bucket/foo").getFileName().toString()); + thrown.expect(UnsupportedOperationException.class); + GcsPath.fromUri("gs://bucket/").getFileName(); + } + + @Test + public void testResolveSibling() { + assertEquals( + "gs://bucket/bar/moo", + GcsPath.fromUri("gs://bucket/bar/foo").resolveSibling("moo").toString()); + assertEquals( + "gs://bucket/moo", + GcsPath.fromUri("gs://bucket/foo").resolveSibling("moo").toString()); + thrown.expect(UnsupportedOperationException.class); + GcsPath.fromUri("gs://bucket/").resolveSibling("moo"); + } + + @Test public void testCompareTo() { GcsPath a = GcsPath.fromComponents("bucket", "a"); GcsPath b = GcsPath.fromComponents("bucket", "b");