[ 
https://issues.apache.org/jira/browse/BEAM-5036?focusedWorklogId=152833&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152833
 ]

ASF GitHub Bot logged work on BEAM-5036:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/18 18:09
            Start Date: 09/Oct/18 18:09
    Worklog Time Spent: 10m 
      Work Description: timrobertson100 closed pull request #6289: [BEAM-5036] 
Optimize the FileBasedSink WriteOperation.moveToOutput()
URL: https://github.com/apache/beam/pull/6289
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 92b2382e365..66a94ae087b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -757,9 +757,14 @@ final void moveToOutputFiles(
             "Will copy temporary file {} to final location {}", 
entry.getKey(), entry.getValue());
       }
       // During a failure case, files may have been deleted in an earlier 
step. Thus
-      // we ignore missing files here.
-      FileSystems.copy(srcFiles, dstFiles, 
StandardMoveOptions.IGNORE_MISSING_FILES);
-      removeTemporaryFiles(srcFiles);
+      // we ignore missing files here. It is possible that files already exist 
in the
+      // destination and we wish to replace them (e.g. a previous job run)
+      FileSystems.rename(
+          srcFiles,
+          dstFiles,
+          StandardMoveOptions.IGNORE_MISSING_FILES,
+          StandardMoveOptions.REPLACE_EXISTING);
+      removeTemporaryFiles(srcFiles); // removes temp folder if applicable
     }
 
     /**
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index c2977d0527c..eb22a76f33e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -36,6 +36,7 @@
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -298,6 +299,12 @@ public static void copy(
    *
    * <p>It doesn't support renaming globs.
    *
+   * <p>If the underlying file system reports that a target file already 
exists and moveOptions
+   * contains {@code StandardMoveOptions.REPLACE_EXISTING} then all target 
files that existed prior
+   * to calling rename will be deleted and the rename retried. When a retry is 
attempted then
+   * missing files from the source will be ignored. Some filesystem 
implementations will <em>always
+   * overwrite</em>.
+   *
    * @param srcResourceIds the references of the source resources
    * @param destResourceIds the references of the destination resources
    */
@@ -310,10 +317,11 @@ public static void rename(
       return;
     }
 
+    Set<MoveOptions> options = Sets.newHashSet(moveOptions);
+
     List<ResourceId> srcToRename = srcResourceIds;
     List<ResourceId> destToRename = destResourceIds;
-    if (Sets.newHashSet(moveOptions)
-        .contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
+    if 
(options.contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
       KV<List<ResourceId>, List<ResourceId>> existings =
           filterMissingFiles(srcResourceIds, destResourceIds);
       srcToRename = existings.getKey();
@@ -322,8 +330,71 @@ public static void rename(
     if (srcToRename.isEmpty()) {
       return;
     }
-    getFileSystemInternal(srcToRename.iterator().next().getScheme())
-        .rename(srcToRename, destToRename);
+
+    boolean replaceExisting =
+        options.contains(MoveOptions.StandardMoveOptions.REPLACE_EXISTING) ? 
true : false;
+    rename(
+        getFileSystemInternal(srcToRename.iterator().next().getScheme()),
+        srcToRename,
+        destToRename,
+        replaceExisting);
+  }
+
+  /**
+   * Executes a rename of the src which all must exist using the provided 
filesystem.
+   *
+   * <p>If replaceExisting is enabled and filesystem throws {code 
FileAlreadyExistsException} then
+   * an attempt to delete the destination is made and the rename is retried. 
Some filesystem
+   * implementations may apply this automatically without throwing.
+   *
+   * @param fileSystem The filesystem in use
+   * @param srcResourceIds The source resources to move
+   * @param destResourceIds The destinations for the sources to move to (must 
be same length as
+   *     srcResourceIds)
+   * @param replaceExisting If existing files in destination should be 
overwritten
+   * @throws IOException If the rename could not be completed
+   */
+  @VisibleForTesting
+  static void rename(
+      FileSystem fileSystem,
+      List<ResourceId> srcResourceIds,
+      List<ResourceId> destResourceIds,
+      boolean replaceExisting)
+      throws IOException {
+    try {
+      fileSystem.rename(srcResourceIds, destResourceIds);
+    } catch (FileAlreadyExistsException e) {
+      if (replaceExisting) {
+
+        // The filesystem has reported a target that existed prior to calling 
rename. Some files may
+        // have been moved successfully but there are no guarantees on which 
as some filesystems
+        // batch and run in parallel asynchronously. We determine the state 
and delete all dest files
+        // still existing in src and issue a retry. This will ignore all non 
existing src files.
+
+        List<MatchResult> matchResultsSrc = matchResources(srcResourceIds);
+        List<MatchResult> matchResultsDest = matchResources(destResourceIds);
+        List<ResourceId> destResourceIdsToDelete = new ArrayList<>();
+        List<ResourceId> srcResourceIdsToRetry = new ArrayList<>();
+        List<ResourceId> destResourceIdsToRetry = new ArrayList<>();
+
+        for (int i = 0; i < matchResultsSrc.size(); ++i) {
+          boolean srcExists = 
!matchResultsSrc.get(i).status().equals(Status.NOT_FOUND);
+          boolean destExists = 
!matchResultsDest.get(i).status().equals(Status.NOT_FOUND);
+          if (srcExists) {
+            srcResourceIdsToRetry.add(srcResourceIds.get(i));
+            destResourceIdsToRetry.add(destResourceIds.get(i));
+            if (destExists) {
+              destResourceIdsToDelete.add(destResourceIds.get(i));
+            }
+          }
+        }
+        fileSystem.delete(destResourceIdsToDelete);
+        fileSystem.rename(srcResourceIdsToRetry, destResourceIdsToRetry);
+
+      } else {
+        throw e;
+      }
+    }
   }
 
   /**
@@ -379,6 +450,7 @@ public ResourceId apply(@Nonnull Metadata input) {
         .delete(resourceIdsToDelete);
   }
 
+  // filters files that do not exist in srcResourceIds
   private static KV<List<ResourceId>, List<ResourceId>> filterMissingFiles(
       List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) 
throws IOException {
     validateSrcDestLists(srcResourceIds, destResourceIds);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 9ca29016fd8..7b847cf850f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -19,6 +19,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -31,6 +32,7 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.CopyOption;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
@@ -80,8 +82,20 @@
 class LocalFileSystem extends FileSystem<LocalResourceId> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalFileSystem.class);
+  private static final CopyOption[] DEFAULT_RENAME_COPY_OPTIONS =
+      new CopyOption[] {StandardCopyOption.ATOMIC_MOVE};
 
-  LocalFileSystem() {}
+  private final CopyOption[] renameCopyOption;
+
+  LocalFileSystem() {
+    renameCopyOption = DEFAULT_RENAME_COPY_OPTIONS;
+  }
+
+  // Exists to allow testing behaviour of the FileSystems utility methods
+  @VisibleForTesting
+  LocalFileSystem(CopyOption[] renameCopyOption) {
+    this.renameCopyOption = renameCopyOption;
+  }
 
   @Override
   protected List<MatchResult> match(List<String> specs) throws IOException {
@@ -169,11 +183,7 @@ protected void rename(List<LocalResourceId> 
srcResourceIds, List<LocalResourceId
             dst.getPath());
       }
       // Rename the source file, replacing the existing destination.
-      Files.move(
-          src.getPath(),
-          dst.getPath(),
-          StandardCopyOption.REPLACE_EXISTING,
-          StandardCopyOption.ATOMIC_MOVE);
+      Files.move(src.getPath(), dst.getPath(), renameCopyOption);
     }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
index 5ffcdae9964..f02fbe8582f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java
@@ -28,5 +28,6 @@
   /** Defines the standard {@link MoveOptions}. */
   enum StandardMoveOptions implements MoveOptions {
     IGNORE_MISSING_FILES,
+    REPLACE_EXISTING
   }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
index 0fbeb71325d..89e9cebb622 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java
@@ -30,6 +30,8 @@
 import java.io.Writer;
 import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.CopyOption;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -168,6 +170,7 @@ public void testRenameIgnoreMissingFiles() throws Exception 
{
 
     createFileWithContent(srcPath1, "content1");
     createFileWithContent(srcPath3, "content3");
+    createFileWithContent(destPath3, "overwrite-me"); // target exists and 
will be replaced
 
     FileSystems.rename(
         toResourceIds(
@@ -186,6 +189,56 @@ public void testRenameIgnoreMissingFiles() throws 
Exception {
         containsInAnyOrder("content3"));
   }
 
+  @Test
+  public void testRenameOverwriteExistingFilesThrows() throws Exception {
+    // Simulate a filesystem which will not overwrite existing files
+    FileSystem<LocalResourceId> fs = new LocalFileSystem(new CopyOption[0]);
+    thrown.expect(FileAlreadyExistsException.class);
+    executeRename(fs, false);
+  }
+
+  @Test
+  public void testRenameOverwriteExistingFiles() throws Exception {
+    // Simulate a filesystem which will not overwrite existing files
+    FileSystem<LocalResourceId> fs = new LocalFileSystem(new CopyOption[0]);
+    executeRename(fs, true);
+  }
+
+  /**
+   * Creates two resources renaming them where one of the target files already 
exists.
+   *
+   * @param fs The internal file system implementation
+   * @param replaceExisting True if an attempt to replace existing files is 
desired
+   * @throws Exception On error creating the files or renaming
+   */
+  private void executeRename(FileSystem<LocalResourceId> fs, boolean 
replaceExisting)
+      throws Exception {
+    Path srcPath1 = temporaryFolder.newFile().toPath();
+    Path srcPath2 = temporaryFolder.newFile().toPath();
+
+    Path destPath1 = srcPath1.resolveSibling("dest1");
+    Path destPath2 = srcPath1.resolveSibling("dest2");
+
+    createFileWithContent(srcPath1, "content1");
+    createFileWithContent(srcPath2, "content2");
+    createFileWithContent(destPath2, "existing target");
+
+    FileSystems.rename(
+        fs,
+        toResourceIds(ImmutableList.of(srcPath1, srcPath2), false /* 
isDirectory */),
+        toResourceIds(ImmutableList.of(destPath1, destPath2), false /* 
isDirectory */),
+        replaceExisting);
+
+    assertFalse(srcPath1.toFile().exists());
+    assertFalse(srcPath2.toFile().exists());
+    assertThat(
+        Files.readLines(destPath1.toFile(), StandardCharsets.UTF_8),
+        containsInAnyOrder("content1"));
+    assertThat(
+        Files.readLines(destPath2.toFile(), StandardCharsets.UTF_8),
+        containsInAnyOrder("content2")); // overwrite the data
+  }
+
   @Test
   public void testValidMatchNewResourceForLocalFileSystem() {
     assertEquals("file", FileSystems.matchNewResource("/tmp/f1", 
false).getScheme());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152833)
    Time Spent: 11h  (was: 10h 50m)

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> ------------------------------------------------------
>
>                 Key: BEAM-5036
>                 URL: https://issues.apache.org/jira/browse/BEAM-5036
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files
>    Affects Versions: 2.5.0
>            Reporter: Jozef Vilcek
>            Assignee: Tim Robertson
>            Priority: Major
>          Time Spent: 11h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to