HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor and 
Aaron Fabbri via lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff7c90a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff7c90a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff7c90a

Branch: refs/heads/HDFS-8707
Commit: bff7c90a5686de106ca7a866982412c5dfa01632
Parents: 19a0c26
Author: Lei Xu <l...@apache.org>
Authored: Thu Nov 5 18:33:53 2015 -0800
Committer: Lei Xu <l...@apache.org>
Committed: Thu Nov 5 18:35:15 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../src/main/resources/core-default.xml         |  10 +-
 .../s3a/BlockingThreadPoolExecutorService.java  | 274 +++++++++++++++++++
 .../org/apache/hadoop/fs/s3a/Constants.java     |  13 +-
 .../hadoop/fs/s3a/S3AFastOutputStream.java      |   4 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  82 +-----
 .../src/site/markdown/tools/hadoop-aws/index.md |  10 +-
 .../TestBlockingThreadPoolExecutorService.java  | 182 ++++++++++++
 .../fs/s3a/TestS3ABlockingThreadPool.java       |  80 ++++++
 9 files changed, 561 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 4035ef8..25f0a8f 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -951,6 +951,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12040. Adjust inputs order for the decode API in raw erasure coder.
     (Kai Zheng via yliu)
 
+    HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor
+    and Aaron Fabbri via lei)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index df597a0..1bdfe4a 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -843,18 +843,12 @@ for ldap providers in the same way as above does.
 
 <property>
   <name>fs.s3a.threads.max</name>
-  <value>256</value>
+  <value>10</value>
   <description> Maximum number of concurrent active (part)uploads,
     which each use a thread from the threadpool.</description>
 </property>
 
 <property>
-  <name>fs.s3a.threads.core</name>
-  <value>15</value>
-  <description>Number of core threads in the threadpool.</description>
-</property>
-
-<property>
   <name>fs.s3a.threads.keepalivetime</name>
   <value>60</value>
   <description>Number of seconds a thread can be idle before being
@@ -863,7 +857,7 @@ for ldap providers in the same way as above does.
 
 <property>
   <name>fs.s3a.max.total.tasks</name>
-  <value>1000</value>
+  <value>5</value>
   <description>Number of (part)uploads allowed to the queue before
     blocking additional uploads.</description>
 </property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
new file mode 100644
index 0000000..3baf6fc
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ * <p>
+ * This is inspired by <a href = "https://github
+ * .com/apache/incubator-s4/blob/master/subprojects
+ * /s4-comm/src/main/java/org/apache/s4/comm/staging
+ * /BlockingThreadPoolExecutorService.java"> this s4 threadpool</a>
+ */
+public class BlockingThreadPoolExecutorService
+    extends ForwardingListeningExecutorService {
+
+  private static Logger LOG = LoggerFactory
+      .getLogger(BlockingThreadPoolExecutorService.class);
+
+  private Semaphore queueingPermits;
+  private ListeningExecutorService executorDelegatee;
+
+  private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
+
+  /**
+   * Returns a {@link java.util.concurrent.ThreadFactory} that names each
+   * created thread uniquely,
+   * with a common prefix.
+   *
+   * @param prefix The prefix of every created Thread's name
+   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
+   */
+  public static ThreadFactory getNamedThreadFactory(final String prefix) {
+    SecurityManager s = System.getSecurityManager();
+    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
+        Thread.currentThread().getThreadGroup();
+
+    return new ThreadFactory() {
+      private final AtomicInteger threadNumber = new AtomicInteger(1);
+      private final int poolNum = POOLNUMBER.getAndIncrement();
+      private final ThreadGroup group = threadGroup;
+
+      @Override
+      public Thread newThread(Runnable r) {
+        final String name =
+            prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
+        return new Thread(group, r, name);
+      }
+    };
+  }
+
+  /**
+   * Get a named {@link ThreadFactory} that just builds daemon threads.
+   *
+   * @param prefix name prefix for all threads created from the factory
+   * @return a thread factory that creates named, daemon threads with
+   * the supplied exception handler and normal priority
+   */
+  private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+    return new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = namedFactory.newThread(r);
+        if (!t.isDaemon()) {
+          t.setDaemon(true);
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+          t.setPriority(Thread.NORM_PRIORITY);
+        }
+        return t;
+      }
+
+    };
+  }
+
+
+  /**
+   * A thread pool that that blocks clients submitting additional tasks if
+   * there are already {@code activeTasks} running threads and {@code
+   * waitingTasks} tasks waiting in its queue.
+   *
+   * @param activeTasks maximum number of active tasks
+   * @param waitingTasks maximum number of waiting tasks
+   * @param keepAliveTime time until threads are cleaned up in {@code unit}
+   * @param unit time unit
+   * @param prefixName prefix of name for threads
+   */
+  public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
+      long keepAliveTime, TimeUnit unit, String prefixName) {
+    super();
+    queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
+    /* Although we generally only expect up to waitingTasks tasks in the
+    queue, we need to be able to buffer all tasks in case dequeueing is
+    slower than enqueueing. */
+    final BlockingQueue<Runnable> workQueue =
+        new LinkedBlockingQueue<>(waitingTasks + activeTasks);
+    ThreadPoolExecutor eventProcessingExecutor =
+        new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
+            workQueue, newDaemonThreadFactory(prefixName),
+            new RejectedExecutionHandler() {
+              @Override
+              public void rejectedExecution(Runnable r,
+                  ThreadPoolExecutor executor) {
+                // This is not expected to happen.
+                LOG.error("Could not submit task to executor {}",
+                    executor.toString());
+              }
+            });
+    eventProcessingExecutor.allowCoreThreadTimeOut(true);
+    executorDelegatee =
+        MoreExecutors.listeningDecorator(eventProcessingExecutor);
+
+  }
+
+  @Override
+  protected ListeningExecutorService delegate() {
+    return executorDelegatee;
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Callable<T> task) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new CallableWithPermitRelease<T>(task));
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Runnable task, T result) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new RunnableWithPermitRelease(task), result);
+  }
+
+  @Override
+  public ListenableFuture<?> submit(Runnable task) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new RunnableWithPermitRelease(task));
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    super.execute(new RunnableWithPermitRelease(command));
+  }
+
+  /**
+   * Releases a permit after the task is executed.
+   */
+  class RunnableWithPermitRelease implements Runnable {
+
+    private Runnable delegatee;
+
+    public RunnableWithPermitRelease(Runnable delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public void run() {
+      try {
+        delegatee.run();
+      } finally {
+        queueingPermits.release();
+      }
+
+    }
+  }
+
+  /**
+   * Releases a permit after the task is completed.
+   */
+  class CallableWithPermitRelease<T> implements Callable<T> {
+
+    private Callable<T> delegatee;
+
+    public CallableWithPermitRelease(Callable<T> delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public T call() throws Exception {
+      try {
+        return delegatee.call();
+      } finally {
+        queueingPermits.release();
+      }
+    }
+
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+      long timeout, TimeUnit unit) throws InterruptedException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
+      TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    throw new RuntimeException("Not implemented");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index fa81d93..99e85cf 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -61,20 +61,15 @@ public class Constants {
 
   // the maximum number of threads to allow in the pool used by TransferManager
   public static final String MAX_THREADS = "fs.s3a.threads.max";
-  public static final int DEFAULT_MAX_THREADS = 256;
+  public static final int DEFAULT_MAX_THREADS = 10;
 
-  // the number of threads to keep in the pool used by TransferManager
-  public static final String CORE_THREADS = "fs.s3a.threads.core";
-  public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
-
-  // when the number of threads is greater than the core, this is the maximum 
time
-  // that excess idle threads will wait for new tasks before terminating.
+  // the time an idle thread waits before terminating
   public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
   public static final int DEFAULT_KEEPALIVE_TIME = 60;
 
-  // the maximum number of tasks that the LinkedBlockingQueue can hold
+  // the maximum number of tasks cached if all threads are already uploading
   public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
-  public static final int DEFAULT_MAX_TOTAL_TASKS = 1000;
+  public static final int DEFAULT_MAX_TOTAL_TASKS = 5;
 
   // size of each of or multipart pieces in bytes
   public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 2e06fba..5558693 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -51,7 +51,7 @@ import java.util.List;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
 
 
 /**
@@ -108,7 +108,7 @@ public class S3AFastOutputStream extends OutputStream {
       String bucket, String key, Progressable progress,
       FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
       String serverSideEncryptionAlgorithm, long partSize,
-      long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
+      long multiPartThreshold, ExecutorService threadPoolExecutor)
       throws IOException {
     this.bucket = bucket;
     this.key = key;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 83be184..ccb0cd2 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -26,11 +26,8 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
@@ -86,7 +83,7 @@ public class S3AFileSystem extends FileSystem {
   private int maxKeys;
   private long partSize;
   private TransferManager transfers;
-  private ThreadPoolExecutor threadPoolExecutor;
+  private ExecutorService threadPoolExecutor;
   private long multiPartThreshold;
   public static final Logger LOG = 
LoggerFactory.getLogger(S3AFileSystem.class);
   private CannedAccessControlList cannedACL;
@@ -95,55 +92,6 @@ public class S3AFileSystem extends FileSystem {
   // The maximum number of entries that can be deleted in any call to s3
   private static final int MAX_ENTRIES_TO_DELETE = 1000;
 
-  private static final AtomicInteger poolNumber = new AtomicInteger(1);
-  /**
-   * Returns a {@link java.util.concurrent.ThreadFactory} that names each 
created thread uniquely,
-   * with a common prefix.
-   * @param prefix The prefix of every created Thread's name
-   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
-   */
-  public static ThreadFactory getNamedThreadFactory(final String prefix) {
-    SecurityManager s = System.getSecurityManager();
-    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : 
Thread.currentThread()
-        .getThreadGroup();
-
-    return new ThreadFactory() {
-      final AtomicInteger threadNumber = new AtomicInteger(1);
-      private final int poolNum = poolNumber.getAndIncrement();
-      final ThreadGroup group = threadGroup;
-
-      @Override
-      public Thread newThread(Runnable r) {
-        final String name = prefix + "-pool" + poolNum + "-t" + 
threadNumber.getAndIncrement();
-        return new Thread(group, r, name);
-      }
-    };
-  }
-
-  /**
-   * Get a named {@link ThreadFactory} that just builds daemon threads.
-   * @param prefix name prefix for all threads created from the factory
-   * @return a thread factory that creates named, daemon threads with
-   *         the supplied exception handler and normal priority
-   */
-  private static ThreadFactory newDaemonThreadFactory(final String prefix) {
-    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
-    return new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread t = namedFactory.newThread(r);
-        if (!t.isDaemon()) {
-          t.setDaemon(true);
-        }
-        if (t.getPriority() != Thread.NORM_PRIORITY) {
-          t.setPriority(Thread.NORM_PRIORITY);
-        }
-        return t;
-      }
-
-    };
-  }
-
   /** Called after a new FileSystem instance is constructed.
    * @param name a uri whose authority section names the host, port, etc.
    *   for this FileSystem
@@ -264,25 +212,19 @@ public class S3AFileSystem extends FileSystem {
     }
 
     int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
-    int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
-    if (maxThreads == 0) {
-      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+    if (maxThreads < 2) {
+      LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
+      maxThreads = 2;
     }
-    if (coreThreads == 0) {
-      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+    int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
+    if (totalTasks < 1) {
+      LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
+      totalTasks = 1;
     }
     long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
-    LinkedBlockingQueue<Runnable> workQueue =
-      new LinkedBlockingQueue<>(maxThreads *
-        conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
-    threadPoolExecutor = new ThreadPoolExecutor(
-        coreThreads,
-        maxThreads,
-        keepAliveTime,
-        TimeUnit.SECONDS,
-        workQueue,
-        newDaemonThreadFactory("s3a-transfer-shared-"));
-    threadPoolExecutor.allowCoreThreadTimeOut(true);
+    threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
+        maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
+        "s3a-transfer-shared");
 
     TransferManagerConfiguration transferConfiguration = new 
TransferManagerConfiguration();
     transferConfiguration.setMinimumUploadPartSize(partSize);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 6df15e6..0260e26 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -231,18 +231,12 @@ If you do any of these: change your credentials 
immediately!
 
     <property>
       <name>fs.s3a.threads.max</name>
-      <value>256</value>
+      <value>10</value>
       <description> Maximum number of concurrent active (part)uploads,
       which each use a thread from the threadpool.</description>
     </property>
 
     <property>
-      <name>fs.s3a.threads.core</name>
-      <value>15</value>
-      <description>Number of core threads in the threadpool.</description>
-    </property>
-
-    <property>
       <name>fs.s3a.threads.keepalivetime</name>
       <value>60</value>
       <description>Number of seconds a thread can be idle before being
@@ -251,7 +245,7 @@ If you do any of these: change your credentials immediately!
 
     <property>
       <name>fs.s3a.max.total.tasks</name>
-      <value>1000</value>
+      <value>5</value>
       <description>Number of (part)uploads allowed to the queue before
       blocking additional uploads.</description>
     </property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
new file mode 100644
index 0000000..25a8958
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.util.StopWatch;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Basic unit test for S3A's blocking executor service.
+ */
+public class TestBlockingThreadPoolExecutorService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      BlockingThreadPoolExecutorService.class);
+
+  private static final int NUM_ACTIVE_TASKS = 4;
+  private static final int NUM_WAITING_TASKS = 2;
+  private static final int TASK_SLEEP_MSEC = 100;
+  private static final int SHUTDOWN_WAIT_MSEC = 200;
+  private static final int SHUTDOWN_WAIT_TRIES = 5;
+  private static final int BLOCKING_THRESHOLD_MSEC = 50;
+
+  private static final Integer SOME_VALUE = 1337;
+
+  private static BlockingThreadPoolExecutorService tpe = null;
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    ensureDestroyed();
+  }
+
+  /**
+   * Basic test of running one trivial task.
+   */
+  @Test
+  public void testSubmitCallable() throws Exception {
+    ensureCreated();
+    ListenableFuture<Integer> f = tpe.submit(callableSleeper);
+    Integer v = f.get();
+    assertEquals(SOME_VALUE, v);
+  }
+
+  /**
+   * More involved test, including detecting blocking when at capacity.
+   */
+  @Test
+  public void testSubmitRunnable() throws Exception {
+    ensureCreated();
+    int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
+    StopWatch stopWatch = new StopWatch().start();
+    for (int i = 0; i < totalTasks; i++) {
+      tpe.submit(sleeper);
+      assertDidntBlock(stopWatch);
+    }
+    tpe.submit(sleeper);
+    assertDidBlock(stopWatch);
+  }
+
+  @Test
+  public void testShutdown() throws Exception {
+    // Cover create / destroy, regardless of when this test case runs
+    ensureCreated();
+    ensureDestroyed();
+
+    // Cover create, execute, destroy, regardless of when test case runs
+    ensureCreated();
+    testSubmitRunnable();
+    ensureDestroyed();
+  }
+
+  // Helper functions, etc.
+
+  private void assertDidntBlock(StopWatch sw) {
+    try {
+      assertFalse("Non-blocking call took too long.",
+          sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
+    } finally {
+      sw.reset().start();
+    }
+  }
+
+  private void assertDidBlock(StopWatch sw) {
+    try {
+      if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
+        throw new RuntimeException("Blocking call returned too fast.");
+      }
+    } finally {
+      sw.reset().start();
+    }
+  }
+
+  private Runnable sleeper = new Runnable() {
+    @Override
+    public void run() {
+      String name = Thread.currentThread().getName();
+      try {
+        Thread.sleep(TASK_SLEEP_MSEC);
+      } catch (InterruptedException e) {
+        LOG.info("Thread {} interrupted.", name);
+        Thread.currentThread().interrupt();
+      }
+    }
+  };
+
+  private Callable<Integer> callableSleeper = new Callable<Integer>() {
+    @Override
+    public Integer call() throws Exception {
+      sleeper.run();
+      return SOME_VALUE;
+    }
+  };
+
+  /**
+   * Helper function to create thread pool under test.
+   */
+  private static void ensureCreated() throws Exception {
+    if (tpe == null) {
+      LOG.debug("Creating thread pool");
+      tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
+          NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
+    }
+  }
+
+  /**
+   * Helper function to terminate thread pool under test, asserting that
+   * shutdown -> terminate works as expected.
+   */
+  private static void ensureDestroyed() throws Exception {
+    if (tpe == null) {
+      return;
+    }
+    int shutdownTries = SHUTDOWN_WAIT_TRIES;
+
+    tpe.shutdown();
+    if (!tpe.isShutdown()) {
+      throw new RuntimeException("Shutdown had no effect.");
+    }
+
+    while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
+        TimeUnit.MILLISECONDS)) {
+      LOG.info("Waiting for thread pool shutdown.");
+      if (shutdownTries-- <= 0) {
+        LOG.error("Failed to terminate thread pool gracefully.");
+        break;
+      }
+    }
+    if (!tpe.isTerminated()) {
+      tpe.shutdownNow();
+      if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
+          TimeUnit.MILLISECONDS)) {
+        throw new RuntimeException(
+            "Failed to terminate thread pool in timely manner.");
+      }
+    }
+    tpe = null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java
new file mode 100644
index 0000000..bd738b2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Demonstrate that the threadpool blocks additional client requests if
+ * its queue is full (rather than throwing an exception) by initiating an
+ * upload consisting of 4 parts with 2 threads and 1 spot in the queue. The
+ * 4th part should not trigger an exception as it would with a
+ * non-blocking threadpool.
+ */
+public class TestS3ABlockingThreadPool {
+
+  private Configuration conf;
+  private S3AFileSystem fs;
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  protected Path getTestPath() {
+    return new Path("/tests3a");
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
+    conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
+    conf.setInt(Constants.MAX_THREADS, 2);
+    conf.setInt(Constants.MAX_TOTAL_TASKS, 1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(getTestPath(), true);
+    }
+  }
+
+  @Test
+  public void testRegularMultiPartUpload() throws Exception {
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
+        1024);
+  }
+
+  @Test
+  public void testFastMultiPartUpload() throws Exception {
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
+        1024);
+
+  }
+}

Reply via email to