Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 c6b1125a2 -> 7985c5fdc


HADOOP-15262. AliyunOSS: move files under a directory in parallel when rename a 
directory. Contributed by Jinhu Wu.

(cherry picked from commit d67a5e2dec5c60d96b0c216182891cdfd7832ac5)
(cherry picked from commit 2285afb32e71622b3dab5051247a1d099cfcbe85)
(cherry picked from commit 322520eb76cdcef25190495ccf98b3ca39907f58)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7985c5fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7985c5fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7985c5fd

Branch: refs/heads/branch-3.0
Commit: 7985c5fdc9fe26e7771a86a5004bf90fcbd8af71
Parents: c6b1125
Author: Sammi Chen <sammi.c...@intel.com>
Authored: Mon Mar 19 15:02:37 2018 +0800
Committer: Sammi Chen <sammi.c...@intel.com>
Committed: Mon Mar 19 15:44:15 2018 +0800

----------------------------------------------------------------------
 .../fs/aliyun/oss/AliyunOSSCopyFileContext.java |  70 ++++++++++
 .../fs/aliyun/oss/AliyunOSSCopyFileTask.java    |  65 ++++++++++
 .../fs/aliyun/oss/AliyunOSSFileSystem.java      |  51 +++++++-
 .../apache/hadoop/fs/aliyun/oss/Constants.java  |  16 +++
 .../oss/TestAliyunOSSFileSystemContract.java    | 130 +++++++++++++++++++
 5 files changed, 330 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7985c5fd/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileContext.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileContext.java
new file mode 100644
index 0000000..a843805
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Used by {@link AliyunOSSFileSystem} and {@link AliyunOSSCopyFileTask}
+ * as copy context. It contains some variables used in copy process.
+ */
+public class AliyunOSSCopyFileContext {
+  private final ReentrantLock lock = new ReentrantLock();
+
+  private Condition readyCondition = lock.newCondition();
+
+  private boolean copyFailure;
+  private int copiesFinish;
+
+  public AliyunOSSCopyFileContext() {
+    copyFailure = false;
+    copiesFinish = 0;
+  }
+
+  public void lock() {
+    lock.lock();
+  }
+
+  public void unlock() {
+    lock.unlock();
+  }
+
+  public void awaitAllFinish(int copiesToFinish) throws InterruptedException {
+    while (this.copiesFinish != copiesToFinish) {
+      readyCondition.await();
+    }
+  }
+
+  public void signalAll() {
+    readyCondition.signalAll();
+  }
+
+  public boolean isCopyFailure() {
+    return copyFailure;
+  }
+
+  public void setCopyFailure(boolean copyFailure) {
+    this.copyFailure = copyFailure;
+  }
+
+  public void incCopiesFinish() {
+    ++copiesFinish;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7985c5fd/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java
new file mode 100644
index 0000000..42cd17b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by {@link AliyunOSSFileSystem} as an task that submitted
+ * to the thread pool to accelerate the copy progress.
+ * Each AliyunOSSCopyFileTask copies one file from src path to dst path
+ */
+public class AliyunOSSCopyFileTask implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(AliyunOSSCopyFileTask.class);
+
+  private AliyunOSSFileSystemStore store;
+  private String srcKey;
+  private String dstKey;
+  private AliyunOSSCopyFileContext copyFileContext;
+
+  public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
+      String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) {
+    this.store = store;
+    this.srcKey = srcKey;
+    this.dstKey = dstKey;
+    this.copyFileContext = copyFileContext;
+  }
+
+  @Override
+  public void run() {
+    boolean fail = false;
+    try {
+      store.copyFile(srcKey, dstKey);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown when copy from "
+          + srcKey + " to " + dstKey +  ", exception: " + e);
+      fail = true;
+    } finally {
+      copyFileContext.lock();
+      if (fail) {
+        copyFileContext.setCopyFailure(fail);
+      }
+      copyFileContext.incCopiesFinish();
+      copyFileContext.signalAll();
+      copyFileContext.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7985c5fd/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
index afff223..b3c63d3 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -24,9 +24,11 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -70,7 +72,9 @@ public class AliyunOSSFileSystem extends FileSystem {
   private AliyunOSSFileSystemStore store;
   private int maxKeys;
   private int maxReadAheadPartNumber;
+  private int maxConcurrentCopyTasksPerDir;
   private ListeningExecutorService boundedThreadPool;
+  private ListeningExecutorService boundedCopyThreadPool;
 
   private static final PathFilter DEFAULT_FILTER = new PathFilter() {
     @Override
@@ -90,6 +94,7 @@ public class AliyunOSSFileSystem extends FileSystem {
     try {
       store.close();
       boundedThreadPool.shutdown();
+      boundedCopyThreadPool.shutdown();
     } finally {
       super.close();
     }
@@ -331,6 +336,23 @@ public class AliyunOSSFileSystem extends FileSystem {
 
     this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
         threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
+
+    maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf,
+        Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY,
+        Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT);
+
+    int maxCopyThreads = AliyunOSSUtils.intPositiveOption(conf,
+        Constants.MAX_COPY_THREADS_NUM_KEY,
+        Constants.MAX_COPY_THREADS_DEFAULT);
+
+    int maxCopyTasks = AliyunOSSUtils.intPositiveOption(conf,
+        Constants.MAX_COPY_TASKS_KEY,
+        Constants.MAX_COPY_TASKS_DEFAULT);
+
+    this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
+        maxCopyThreads, maxCopyTasks, 60L,
+        TimeUnit.SECONDS, "oss-copy-unbounded");
+
     setConf(conf);
   }
 
@@ -653,14 +675,30 @@ public class AliyunOSSFileSystem extends FileSystem {
     }
 
     store.storeEmptyFile(dstKey);
+    AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
+    ExecutorService executorService = MoreExecutors.listeningDecorator(
+        new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
+            maxConcurrentCopyTasksPerDir, true));
     ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
     statistics.incrementReadOps(1);
     // Copy files from src folder to dst
+    int copiesToFinish = 0;
     while (true) {
       for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
         String newKey =
             dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
-        store.copyFile(objectSummary.getKey(), newKey);
+
+        //copy operation just copies metadata, oss will support shallow copy
+        executorService.execute(new AliyunOSSCopyFileTask(
+            store, objectSummary.getKey(), newKey, copyFileContext));
+        copiesToFinish++;
+        // No need to call lock() here.
+        // It's ok to copy one more file if the rename operation failed
+        // Reduce the call of lock() can also improve our performance
+        if (copyFileContext.isCopyFailure()) {
+          //some error occurs, break
+          break;
+        }
       }
       if (objects.isTruncated()) {
         String nextMarker = objects.getNextMarker();
@@ -670,7 +708,16 @@ public class AliyunOSSFileSystem extends FileSystem {
         break;
       }
     }
-    return true;
+    //wait operations in progress to finish
+    copyFileContext.lock();
+    try {
+      copyFileContext.awaitAllFinish(copiesToFinish);
+    } catch (InterruptedException e) {
+      LOG.warn("interrupted when wait copies to finish");
+    } finally {
+      copyFileContext.unlock();
+    }
+    return !copyFileContext.isCopyFailure();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7985c5fd/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
index 410adc9..283927c 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
@@ -110,6 +110,22 @@ public final class Constants {
       "fs.oss.multipart.download.ahead.part.max.number";
   public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
 
+  // The maximum queue number for copies
+  // New copies will be blocked when queue is full
+  public static final String MAX_COPY_TASKS_KEY = "fs.oss.max.copy.tasks";
+  public static final int MAX_COPY_TASKS_DEFAULT = 1024 * 10240;
+
+  // The maximum number of threads allowed in the pool for copies
+  public static final String MAX_COPY_THREADS_NUM_KEY =
+      "fs.oss.max.copy.threads";
+  public static final int MAX_COPY_THREADS_DEFAULT = 25;
+
+  // The maximum number of concurrent tasks allowed to copy one directory.
+  // So we will not block other copies
+  public static final String MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY =
+      "fs.oss.max.copy.tasks.per.dir";
+  public static final int MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT = 5;
+
   // Comma separated list of directories
   public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7985c5fd/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java
 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java
index 46ab339..0570146 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java
@@ -33,6 +33,8 @@ import java.io.IOException;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeFalse;
 import static org.junit.Assume.assumeNotNull;
 import static org.junit.Assume.assumeTrue;
 
@@ -133,6 +135,134 @@ public class TestAliyunOSSFileSystemContract
   }
 
   @Test
+  public void testRenameDirectoryConcurrent() throws Exception {
+    assumeTrue(renameSupported());
+    Path src = this.path("/test/hadoop/file/");
+    Path child1 = this.path("/test/hadoop/file/1");
+    Path child2 = this.path("/test/hadoop/file/2");
+    Path child3 = this.path("/test/hadoop/file/3");
+    Path child4 = this.path("/test/hadoop/file/4");
+
+    this.createFile(child1);
+    this.createFile(child2);
+    this.createFile(child3);
+    this.createFile(child4);
+
+    Path dst = this.path("/test/new");
+    super.rename(src, dst, true, false, true);
+    assertEquals(4, this.fs.listStatus(dst).length);
+  }
+
+  @Test
+  public void testRenameDirectoryCopyTaskAllSucceed() throws Exception {
+    assumeTrue(renameSupported());
+    Path srcOne = this.path("/test/hadoop/file/1");
+    this.createFile(srcOne);
+
+    Path dstOne = this.path("/test/new/file/1");
+    Path dstTwo = this.path("/test/new/file/2");
+    AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
+    AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
+    store.storeEmptyFile("test/new/file/");
+    AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
+        store, srcOne.toUri().getPath().substring(1),
+        dstOne.toUri().getPath().substring(1), copyFileContext);
+    oneCopyFileTask.run();
+    assumeFalse(copyFileContext.isCopyFailure());
+
+    AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
+        store, srcOne.toUri().getPath().substring(1),
+        dstTwo.toUri().getPath().substring(1), copyFileContext);
+    twoCopyFileTask.run();
+    assumeFalse(copyFileContext.isCopyFailure());
+
+    copyFileContext.lock();
+    try {
+      copyFileContext.awaitAllFinish(2);
+    } catch (InterruptedException e) {
+      throw new Exception(e);
+    } finally {
+      copyFileContext.unlock();
+    }
+    assumeFalse(copyFileContext.isCopyFailure());
+  }
+
+  @Test
+  public void testRenameDirectoryCopyTaskAllFailed() throws Exception {
+    assumeTrue(renameSupported());
+    Path srcOne = this.path("/test/hadoop/file/1");
+    this.createFile(srcOne);
+
+    Path dstOne = new Path("1");
+    Path dstTwo = new Path("2");
+    AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
+    AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
+    //store.storeEmptyFile("test/new/file/");
+    AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
+        store, srcOne.toUri().getPath().substring(1),
+        dstOne.toUri().getPath().substring(1), copyFileContext);
+    oneCopyFileTask.run();
+    assumeTrue(copyFileContext.isCopyFailure());
+
+    AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
+        store, srcOne.toUri().getPath().substring(1),
+        dstTwo.toUri().getPath().substring(1), copyFileContext);
+    twoCopyFileTask.run();
+    assumeTrue(copyFileContext.isCopyFailure());
+
+    copyFileContext.lock();
+    try {
+      copyFileContext.awaitAllFinish(2);
+    } catch (InterruptedException e) {
+      throw new Exception(e);
+    } finally {
+      copyFileContext.unlock();
+    }
+    assumeTrue(copyFileContext.isCopyFailure());
+  }
+
+  @Test
+  public void testRenameDirectoryCopyTaskPartialFailed() throws Exception {
+    assumeTrue(renameSupported());
+    Path srcOne = this.path("/test/hadoop/file/1");
+    this.createFile(srcOne);
+
+    Path dstOne = new Path("1");
+    Path dstTwo = new Path("/test/new/file/2");
+    Path dstThree = new Path("3");
+    AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
+    AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
+    //store.storeEmptyFile("test/new/file/");
+    AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
+        store, srcOne.toUri().getPath().substring(1),
+        dstOne.toUri().getPath().substring(1), copyFileContext);
+    oneCopyFileTask.run();
+    assumeTrue(copyFileContext.isCopyFailure());
+
+    AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
+        store, srcOne.toUri().getPath().substring(1),
+        dstTwo.toUri().getPath().substring(1), copyFileContext);
+    twoCopyFileTask.run();
+    assumeTrue(copyFileContext.isCopyFailure());
+
+    AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask(
+        store, srcOne.toUri().getPath().substring(1),
+        dstThree.toUri().getPath().substring(1), copyFileContext);
+    threeCopyFileTask.run();
+    assumeTrue(copyFileContext.isCopyFailure());
+
+    copyFileContext.lock();
+    try {
+      copyFileContext.awaitAllFinish(3);
+    } catch (InterruptedException e) {
+      throw new Exception(e);
+    } finally {
+      copyFileContext.unlock();
+    }
+    assumeTrue(copyFileContext.isCopyFailure());
+  }
+
+  @Test
   public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception 
{
     assumeTrue(renameSupported());
     Path src = this.path("/test/hadoop/dir");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to