Makes FileBasedSink use a temporary directory

When writing to /path/to/foo, temporary files would be
written to /path/too/foo-temp-$uid (or something like that),
i.e. as siblings of the final output. That could lead
to issues like http://stackoverflow.com/q/39822859/278042

Now, temporary files are written to a path like:
/path/too/temp-beam-foo-$date/$uid.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8c1008c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8c1008c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8c1008c4

Branch: refs/heads/apex-runner
Commit: 8c1008c42615dd7704fc2df02b140af1c8332c72
Parents: c6c41ea
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Tue Oct 4 15:23:27 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Thu Nov 3 12:39:12 2016 -0700

----------------------------------------------------------------------
 .../examples/MinimalWordCountJava8Test.java     |  2 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 34 +++++++++++++++-----
 .../beam/sdk/util/FileIOChannelFactory.java     |  8 +++--
 .../beam/sdk/util/GcsIOChannelFactory.java      |  8 ++++-
 .../apache/beam/sdk/util/IOChannelFactory.java  |  4 +++
 .../org/apache/beam/sdk/util/gcsfs/GcsPath.java | 13 ++++++--
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 23 +++++++------
 .../org/apache/beam/sdk/io/XmlSinkTest.java     | 16 +++++++--
 .../apache/beam/sdk/util/gcsfs/GcsPathTest.java | 25 ++++++++++++++
 9 files changed, 107 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git 
a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
 
b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 1819219..f373343 100644
--- 
a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ 
b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -68,7 +68,7 @@ public class MinimalWordCountJava8Test implements 
Serializable {
      .apply(MapElements
          .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + 
wordCount.getValue())
          .withOutputType(TypeDescriptors.strings()))
-     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+     .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
   }
 
   private GcsUtil buildMockGcsUtil() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 4355962..3d0fe04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -31,6 +31,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
@@ -53,6 +54,8 @@ import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -317,17 +320,16 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     protected final String baseTemporaryFilename;
 
     /**
-     * Name separator for temporary files. Temporary files will be named
-     * {@code {baseTemporaryFilename}-temp-{bundleId}}.
-     */
-    protected static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-";
-
-    /**
      * Build a temporary filename using the temporary filename separator with 
the given prefix and
      * suffix.
      */
     protected static final String buildTemporaryFilename(String prefix, String 
suffix) {
-      return prefix + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + 
suffix;
+      try {
+        IOChannelFactory factory = IOChannelUtils.getFactory(prefix);
+        return factory.resolve(prefix, suffix);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     /**
@@ -337,7 +339,23 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * @param sink the FileBasedSink that will be used to configure this write 
operation.
      */
     public FileBasedWriteOperation(FileBasedSink<T> sink) {
-      this(sink, sink.baseOutputFilename);
+      this(sink, buildTemporaryDirectoryName(sink.getBaseOutputFilename()));
+    }
+
+    private static String buildTemporaryDirectoryName(String 
baseOutputFilename) {
+      try {
+        IOChannelFactory factory = 
IOChannelUtils.getFactory(baseOutputFilename);
+        Path baseOutputPath = factory.toPath(baseOutputFilename);
+        return baseOutputPath
+            .resolveSibling(
+                "temp-beam-"
+                    + baseOutputPath.getFileName()
+                    + "-"
+                    + 
Instant.now().toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss")))
+            .toString();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index a11231b..2d2c0c6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -148,7 +148,11 @@ public class FileIOChannelFactory implements 
IOChannelFactory {
 
   @Override
   public String resolve(String path, String other) throws IOException {
-    Path p = specToFile(path).toPath();
-    return p.resolve(other).toString();
+    return toPath(path).resolve(other).toString();
+  }
+
+  @Override
+  public Path toPath(String path) {
+    return specToFile(path).toPath();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
index 14090e3..652e468 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -81,6 +82,11 @@ public class GcsIOChannelFactory implements IOChannelFactory 
{
 
   @Override
   public String resolve(String path, String other) throws IOException {
-    return GcsPath.fromUri(path).resolve(other).toString();
+    return toPath(path).resolve(other).toString();
+  }
+
+  @Override
+  public Path toPath(String path) {
+    return GcsPath.fromUri(path);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
index ae6c507..4e55036 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
 import java.util.Collection;
 
 /**
@@ -99,4 +100,7 @@ public interface IOChannelFactory {
    * dependent and therefore unspecified.
    */
   String resolve(String path, String other) throws IOException;
+
+  /** Converts the given string to a {@link Path}. */
+  Path toPath(String path);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
index bfcd6da..863b01b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
@@ -261,7 +261,12 @@ public class GcsPath implements Path {
 
   @Override
   public GcsPath getFileName() {
-    throw new UnsupportedOperationException();
+    int nameCount = getNameCount();
+    if (nameCount < 2) {
+      throw new UnsupportedOperationException(
+          "Can't get filename from root path in the bucket: " + this);
+    }
+    return getName(nameCount - 1);
   }
 
   /**
@@ -436,7 +441,11 @@ public class GcsPath implements Path {
 
   @Override
   public Path resolveSibling(String other) {
-    throw new UnsupportedOperationException();
+    if (getNameCount() < 2) {
+      throw new UnsupportedOperationException("Can't resolve the sibling of a 
root path: " + this);
+    }
+    GcsPath parent = getParent();
+    return (parent == null) ? fromUri(other) : parent.resolve(other);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 66bb661..8301afc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -89,7 +89,7 @@ public class FileBasedSinkTest {
   public void testWriter() throws Exception {
     String testUid = "testId";
     String expectedFilename =
-        getBaseTempFilename() + 
FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + testUid;
+        getBaseTempFilename() + "/" + testUid;
     SimpleSink.SimpleWriter writer = buildWriter();
 
     List<String> values = Arrays.asList("sympathetic vulture", "boresome 
hummingbird");
@@ -193,8 +193,7 @@ public class FileBasedSinkTest {
     runFinalize(writeOp, files, false);
 
     // create a temporary file
-    tmpFolder.newFile(
-        baseTemporaryFilename + 
FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + "1");
+    tmpFolder.newFile(baseTemporaryFilename + "/1");
 
     runFinalize(writeOp, files, false);
   }
@@ -217,7 +216,9 @@ public class FileBasedSinkTest {
     for (int i = 0; i < numFiles; i++) {
       String temporaryFilename =
           
FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i);
-      File tmpFile = tmpFolder.newFile(temporaryFilename);
+      File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename);
+      tmpFile.getParentFile().mkdirs();
+      assertTrue(tmpFile.createNewFile());
       temporaryFiles.add(tmpFile);
     }
 
@@ -263,8 +264,10 @@ public class FileBasedSinkTest {
     List<File> temporaryFiles = new ArrayList<>();
     List<File> outputFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      File tmpFile = tmpFolder.newFile(
+      File tmpFile = new File(tmpFolder.getRoot(),
           
FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i));
+      tmpFile.getParentFile().mkdirs();
+      assertTrue(tmpFile.createNewFile());
       temporaryFiles.add(tmpFile);
       File outputFile = tmpFolder.newFile(baseOutputFilename + i);
       outputFiles.add(outputFile);
@@ -496,11 +499,13 @@ public class FileBasedSinkTest {
   @Test
   public void testFileBasedWriterWithWritableByteChannelFactory() throws 
Exception {
     final String testUid = "testId";
-    final String expectedFilename =
-        getBaseOutputFilename() + 
FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + testUid;
-    final FileBasedWriter<String> writer =
+    SimpleSink.SimpleWriteOperation writeOp =
         new SimpleSink(getBaseOutputFilename(), "txt", new 
DrunkWritableByteChannelFactory())
-            .createWriteOperation(null).createWriter(null);
+            .createWriteOperation(null);
+    final FileBasedWriter<String> writer =
+        writeOp.createWriter(null);
+    final String expectedFilename =
+        writeOp.baseTemporaryFilename + "/" + testUid;
 
     final List<String> expected = new ArrayList<>();
     expected.add("header");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 2788ea6..653a9d0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -28,6 +29,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -59,7 +61,7 @@ public class XmlSinkTest {
 
   private Class<Bird> testClass = Bird.class;
   private String testRootElement = "testElement";
-  private String testFilePrefix = "testPrefix";
+  private String testFilePrefix = "/path/to/testPrefix";
 
   /**
    * An XmlWriter correctly writes objects as Xml elements with an enclosing 
root element.
@@ -143,7 +145,11 @@ public class XmlSinkTest {
     assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename);
     assertEquals(testRootElement, writeOp.getSink().rootElementName);
     assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
-    assertEquals(testFilePrefix, writeOp.baseTemporaryFilename);
+    Path outputPath = new File(testFilePrefix).toPath();
+    Path tempPath = new File(writeOp.baseTemporaryFilename).toPath();
+    assertEquals(outputPath.getParent(), tempPath.getParent());
+    assertThat(
+        tempPath.getFileName().toString(), containsString("temp-beam-" + 
outputPath.getFileName()));
   }
 
   /**
@@ -156,7 +162,11 @@ public class XmlSinkTest {
         XmlSink.writeOf(testClass, testRootElement, testFilePrefix)
             .createWriteOperation(options);
     XmlWriter<Bird> writer = writeOp.createWriter(options);
-    assertEquals(testFilePrefix, 
writer.getWriteOperation().baseTemporaryFilename);
+    Path outputPath = new File(testFilePrefix).toPath();
+    Path tempPath = new 
File(writer.getWriteOperation().baseTemporaryFilename).toPath();
+    assertEquals(outputPath.getParent(), tempPath.getParent());
+    assertThat(
+        tempPath.getFileName().toString(), containsString("temp-beam-" + 
outputPath.getFileName()));
     assertEquals(testRootElement, 
writer.getWriteOperation().getSink().rootElementName);
     assertNotNull(writer.marshaller);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8c1008c4/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java
index 5c86184..426fb16 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java
@@ -31,7 +31,9 @@ import java.util.Iterator;
 import java.util.List;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -73,6 +75,9 @@ public class GcsPathTest {
       new TestCase("gs://bucket/", "bucket")
   );
 
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   @Test
   public void testGcsPathParsing() throws Exception {
     for (TestCase testCase : PATH_TEST_CASES) {
@@ -237,6 +242,26 @@ public class GcsPathTest {
   }
 
   @Test
+  public void testGetFileName() {
+    assertEquals("foo", 
GcsPath.fromUri("gs://bucket/bar/foo").getFileName().toString());
+    assertEquals("foo", 
GcsPath.fromUri("gs://bucket/foo").getFileName().toString());
+    thrown.expect(UnsupportedOperationException.class);
+    GcsPath.fromUri("gs://bucket/").getFileName();
+  }
+
+  @Test
+  public void testResolveSibling() {
+    assertEquals(
+        "gs://bucket/bar/moo",
+        
GcsPath.fromUri("gs://bucket/bar/foo").resolveSibling("moo").toString());
+    assertEquals(
+        "gs://bucket/moo",
+        GcsPath.fromUri("gs://bucket/foo").resolveSibling("moo").toString());
+    thrown.expect(UnsupportedOperationException.class);
+    GcsPath.fromUri("gs://bucket/").resolveSibling("moo");
+  }
+
+  @Test
   public void testCompareTo() {
     GcsPath a = GcsPath.fromComponents("bucket", "a");
     GcsPath b = GcsPath.fromComponents("bucket", "b");

Reply via email to