HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor and Aaron Fabbri via lei)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff7c90a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff7c90a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff7c90a Branch: refs/heads/HDFS-8707 Commit: bff7c90a5686de106ca7a866982412c5dfa01632 Parents: 19a0c26 Author: Lei Xu <l...@apache.org> Authored: Thu Nov 5 18:33:53 2015 -0800 Committer: Lei Xu <l...@apache.org> Committed: Thu Nov 5 18:35:15 2015 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../src/main/resources/core-default.xml | 10 +- .../s3a/BlockingThreadPoolExecutorService.java | 274 +++++++++++++++++++ .../org/apache/hadoop/fs/s3a/Constants.java | 13 +- .../hadoop/fs/s3a/S3AFastOutputStream.java | 4 +- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 82 +----- .../src/site/markdown/tools/hadoop-aws/index.md | 10 +- .../TestBlockingThreadPoolExecutorService.java | 182 ++++++++++++ .../fs/s3a/TestS3ABlockingThreadPool.java | 80 ++++++ 9 files changed, 561 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4035ef8..25f0a8f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -951,6 +951,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12040. Adjust inputs order for the decode API in raw erasure coder. (Kai Zheng via yliu) + HADOOP-11684. S3a to use thread pool that blocks clients. (Thomas Demoor + and Aaron Fabbri via lei) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index df597a0..1bdfe4a 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -843,18 +843,12 @@ for ldap providers in the same way as above does. <property> <name>fs.s3a.threads.max</name> - <value>256</value> + <value>10</value> <description> Maximum number of concurrent active (part)uploads, which each use a thread from the threadpool.</description> </property> <property> - <name>fs.s3a.threads.core</name> - <value>15</value> - <description>Number of core threads in the threadpool.</description> -</property> - -<property> <name>fs.s3a.threads.keepalivetime</name> <value>60</value> <description>Number of seconds a thread can be idle before being @@ -863,7 +857,7 @@ for ldap providers in the same way as above does. <property> <name>fs.s3a.max.total.tasks</name> - <value>1000</value> + <value>5</value> <description>Number of (part)uploads allowed to the queue before blocking additional uploads.</description> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java new file mode 100644 index 0000000..3baf6fc --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ForwardingListeningExecutorService; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * This ExecutorService blocks the submission of new tasks when its queue is + * already full by using a semaphore. Task submissions require permits, task + * completions release permits. + * <p> + * This is inspired by <a href = "https://github + * .com/apache/incubator-s4/blob/master/subprojects + * /s4-comm/src/main/java/org/apache/s4/comm/staging + * /BlockingThreadPoolExecutorService.java"> this s4 threadpool</a> + */ +public class BlockingThreadPoolExecutorService + extends ForwardingListeningExecutorService { + + private static Logger LOG = LoggerFactory + .getLogger(BlockingThreadPoolExecutorService.class); + + private Semaphore queueingPermits; + private ListeningExecutorService executorDelegatee; + + private static final AtomicInteger POOLNUMBER = new AtomicInteger(1); + + /** + * Returns a {@link java.util.concurrent.ThreadFactory} that names each + * created thread uniquely, + * with a common prefix. + * + * @param prefix The prefix of every created Thread's name + * @return a {@link java.util.concurrent.ThreadFactory} that names threads + */ + public static ThreadFactory getNamedThreadFactory(final String prefix) { + SecurityManager s = System.getSecurityManager(); + final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + + return new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final int poolNum = POOLNUMBER.getAndIncrement(); + private final ThreadGroup group = threadGroup; + + @Override + public Thread newThread(Runnable r) { + final String name = + prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); + return new Thread(group, r, name); + } + }; + } + + /** + * Get a named {@link ThreadFactory} that just builds daemon threads. + * + * @param prefix name prefix for all threads created from the factory + * @return a thread factory that creates named, daemon threads with + * the supplied exception handler and normal priority + */ + private static ThreadFactory newDaemonThreadFactory(final String prefix) { + final ThreadFactory namedFactory = getNamedThreadFactory(prefix); + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = namedFactory.newThread(r); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + + }; + } + + + /** + * A thread pool that that blocks clients submitting additional tasks if + * there are already {@code activeTasks} running threads and {@code + * waitingTasks} tasks waiting in its queue. + * + * @param activeTasks maximum number of active tasks + * @param waitingTasks maximum number of waiting tasks + * @param keepAliveTime time until threads are cleaned up in {@code unit} + * @param unit time unit + * @param prefixName prefix of name for threads + */ + public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks, + long keepAliveTime, TimeUnit unit, String prefixName) { + super(); + queueingPermits = new Semaphore(waitingTasks + activeTasks, false); + /* Although we generally only expect up to waitingTasks tasks in the + queue, we need to be able to buffer all tasks in case dequeueing is + slower than enqueueing. */ + final BlockingQueue<Runnable> workQueue = + new LinkedBlockingQueue<>(waitingTasks + activeTasks); + ThreadPoolExecutor eventProcessingExecutor = + new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit, + workQueue, newDaemonThreadFactory(prefixName), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, + ThreadPoolExecutor executor) { + // This is not expected to happen. + LOG.error("Could not submit task to executor {}", + executor.toString()); + } + }); + eventProcessingExecutor.allowCoreThreadTimeOut(true); + executorDelegatee = + MoreExecutors.listeningDecorator(eventProcessingExecutor); + + } + + @Override + protected ListeningExecutorService delegate() { + return executorDelegatee; + } + + @Override + public <T> ListenableFuture<T> submit(Callable<T> task) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new CallableWithPermitRelease<T>(task)); + } + + @Override + public <T> ListenableFuture<T> submit(Runnable task, T result) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new RunnableWithPermitRelease(task), result); + } + + @Override + public ListenableFuture<?> submit(Runnable task) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new RunnableWithPermitRelease(task)); + } + + @Override + public void execute(Runnable command) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + super.execute(new RunnableWithPermitRelease(command)); + } + + /** + * Releases a permit after the task is executed. + */ + class RunnableWithPermitRelease implements Runnable { + + private Runnable delegatee; + + public RunnableWithPermitRelease(Runnable delegatee) { + this.delegatee = delegatee; + } + + @Override + public void run() { + try { + delegatee.run(); + } finally { + queueingPermits.release(); + } + + } + } + + /** + * Releases a permit after the task is completed. + */ + class CallableWithPermitRelease<T> implements Callable<T> { + + private Callable<T> delegatee; + + public CallableWithPermitRelease(Callable<T> delegatee) { + this.delegatee = delegatee; + } + + @Override + public T call() throws Exception { + try { + return delegatee.call(); + } finally { + queueingPermits.release(); + } + } + + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, TimeUnit unit) throws InterruptedException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new RuntimeException("Not implemented"); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index fa81d93..99e85cf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -61,20 +61,15 @@ public class Constants { // the maximum number of threads to allow in the pool used by TransferManager public static final String MAX_THREADS = "fs.s3a.threads.max"; - public static final int DEFAULT_MAX_THREADS = 256; + public static final int DEFAULT_MAX_THREADS = 10; - // the number of threads to keep in the pool used by TransferManager - public static final String CORE_THREADS = "fs.s3a.threads.core"; - public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS; - - // when the number of threads is greater than the core, this is the maximum time - // that excess idle threads will wait for new tasks before terminating. + // the time an idle thread waits before terminating public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime"; public static final int DEFAULT_KEEPALIVE_TIME = 60; - // the maximum number of tasks that the LinkedBlockingQueue can hold + // the maximum number of tasks cached if all threads are already uploading public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks"; - public static final int DEFAULT_MAX_TOTAL_TASKS = 1000; + public static final int DEFAULT_MAX_TOTAL_TASKS = 5; // size of each of or multipart pieces in bytes public static final String MULTIPART_SIZE = "fs.s3a.multipart.size"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 2e06fba..5558693 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -51,7 +51,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; /** @@ -108,7 +108,7 @@ public class S3AFastOutputStream extends OutputStream { String bucket, String key, Progressable progress, FileSystem.Statistics statistics, CannedAccessControlList cannedACL, String serverSideEncryptionAlgorithm, long partSize, - long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor) + long multiPartThreshold, ExecutorService threadPoolExecutor) throws IOException { this.bucket = bucket; this.key = key; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 83be184..ccb0cd2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -26,11 +26,8 @@ import java.net.URI; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; @@ -86,7 +83,7 @@ public class S3AFileSystem extends FileSystem { private int maxKeys; private long partSize; private TransferManager transfers; - private ThreadPoolExecutor threadPoolExecutor; + private ExecutorService threadPoolExecutor; private long multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private CannedAccessControlList cannedACL; @@ -95,55 +92,6 @@ public class S3AFileSystem extends FileSystem { // The maximum number of entries that can be deleted in any call to s3 private static final int MAX_ENTRIES_TO_DELETE = 1000; - private static final AtomicInteger poolNumber = new AtomicInteger(1); - /** - * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, - * with a common prefix. - * @param prefix The prefix of every created Thread's name - * @return a {@link java.util.concurrent.ThreadFactory} that names threads - */ - public static ThreadFactory getNamedThreadFactory(final String prefix) { - SecurityManager s = System.getSecurityManager(); - final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() - .getThreadGroup(); - - return new ThreadFactory() { - final AtomicInteger threadNumber = new AtomicInteger(1); - private final int poolNum = poolNumber.getAndIncrement(); - final ThreadGroup group = threadGroup; - - @Override - public Thread newThread(Runnable r) { - final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); - return new Thread(group, r, name); - } - }; - } - - /** - * Get a named {@link ThreadFactory} that just builds daemon threads. - * @param prefix name prefix for all threads created from the factory - * @return a thread factory that creates named, daemon threads with - * the supplied exception handler and normal priority - */ - private static ThreadFactory newDaemonThreadFactory(final String prefix) { - final ThreadFactory namedFactory = getNamedThreadFactory(prefix); - return new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = namedFactory.newThread(r); - if (!t.isDaemon()) { - t.setDaemon(true); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - - }; - } - /** Called after a new FileSystem instance is constructed. * @param name a uri whose authority section names the host, port, etc. * for this FileSystem @@ -264,25 +212,19 @@ public class S3AFileSystem extends FileSystem { } int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); - int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS); - if (maxThreads == 0) { - maxThreads = Runtime.getRuntime().availableProcessors() * 8; + if (maxThreads < 2) { + LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2."); + maxThreads = 2; } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; + int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS); + if (totalTasks < 1) { + LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1."); + totalTasks = 1; } long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); - LinkedBlockingQueue<Runnable> workQueue = - new LinkedBlockingQueue<>(maxThreads * - conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); - threadPoolExecutor = new ThreadPoolExecutor( - coreThreads, - maxThreads, - keepAliveTime, - TimeUnit.SECONDS, - workQueue, - newDaemonThreadFactory("s3a-transfer-shared-")); - threadPoolExecutor.allowCoreThreadTimeOut(true); + threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads, + maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS, + "s3a-transfer-shared"); TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); transferConfiguration.setMinimumUploadPartSize(partSize); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 6df15e6..0260e26 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -231,18 +231,12 @@ If you do any of these: change your credentials immediately! <property> <name>fs.s3a.threads.max</name> - <value>256</value> + <value>10</value> <description> Maximum number of concurrent active (part)uploads, which each use a thread from the threadpool.</description> </property> <property> - <name>fs.s3a.threads.core</name> - <value>15</value> - <description>Number of core threads in the threadpool.</description> - </property> - - <property> <name>fs.s3a.threads.keepalivetime</name> <value>60</value> <description>Number of seconds a thread can be idle before being @@ -251,7 +245,7 @@ If you do any of these: change your credentials immediately! <property> <name>fs.s3a.max.total.tasks</name> - <value>1000</value> + <value>5</value> <description>Number of (part)uploads allowed to the queue before blocking additional uploads.</description> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java new file mode 100644 index 0000000..25a8958 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.util.StopWatch; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Basic unit test for S3A's blocking executor service. + */ +public class TestBlockingThreadPoolExecutorService { + + private static final Logger LOG = LoggerFactory.getLogger( + BlockingThreadPoolExecutorService.class); + + private static final int NUM_ACTIVE_TASKS = 4; + private static final int NUM_WAITING_TASKS = 2; + private static final int TASK_SLEEP_MSEC = 100; + private static final int SHUTDOWN_WAIT_MSEC = 200; + private static final int SHUTDOWN_WAIT_TRIES = 5; + private static final int BLOCKING_THRESHOLD_MSEC = 50; + + private static final Integer SOME_VALUE = 1337; + + private static BlockingThreadPoolExecutorService tpe = null; + + @AfterClass + public static void afterClass() throws Exception { + ensureDestroyed(); + } + + /** + * Basic test of running one trivial task. + */ + @Test + public void testSubmitCallable() throws Exception { + ensureCreated(); + ListenableFuture<Integer> f = tpe.submit(callableSleeper); + Integer v = f.get(); + assertEquals(SOME_VALUE, v); + } + + /** + * More involved test, including detecting blocking when at capacity. + */ + @Test + public void testSubmitRunnable() throws Exception { + ensureCreated(); + int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS; + StopWatch stopWatch = new StopWatch().start(); + for (int i = 0; i < totalTasks; i++) { + tpe.submit(sleeper); + assertDidntBlock(stopWatch); + } + tpe.submit(sleeper); + assertDidBlock(stopWatch); + } + + @Test + public void testShutdown() throws Exception { + // Cover create / destroy, regardless of when this test case runs + ensureCreated(); + ensureDestroyed(); + + // Cover create, execute, destroy, regardless of when test case runs + ensureCreated(); + testSubmitRunnable(); + ensureDestroyed(); + } + + // Helper functions, etc. + + private void assertDidntBlock(StopWatch sw) { + try { + assertFalse("Non-blocking call took too long.", + sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC); + } finally { + sw.reset().start(); + } + } + + private void assertDidBlock(StopWatch sw) { + try { + if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) { + throw new RuntimeException("Blocking call returned too fast."); + } + } finally { + sw.reset().start(); + } + } + + private Runnable sleeper = new Runnable() { + @Override + public void run() { + String name = Thread.currentThread().getName(); + try { + Thread.sleep(TASK_SLEEP_MSEC); + } catch (InterruptedException e) { + LOG.info("Thread {} interrupted.", name); + Thread.currentThread().interrupt(); + } + } + }; + + private Callable<Integer> callableSleeper = new Callable<Integer>() { + @Override + public Integer call() throws Exception { + sleeper.run(); + return SOME_VALUE; + } + }; + + /** + * Helper function to create thread pool under test. + */ + private static void ensureCreated() throws Exception { + if (tpe == null) { + LOG.debug("Creating thread pool"); + tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS, + NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest"); + } + } + + /** + * Helper function to terminate thread pool under test, asserting that + * shutdown -> terminate works as expected. + */ + private static void ensureDestroyed() throws Exception { + if (tpe == null) { + return; + } + int shutdownTries = SHUTDOWN_WAIT_TRIES; + + tpe.shutdown(); + if (!tpe.isShutdown()) { + throw new RuntimeException("Shutdown had no effect."); + } + + while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC, + TimeUnit.MILLISECONDS)) { + LOG.info("Waiting for thread pool shutdown."); + if (shutdownTries-- <= 0) { + LOG.error("Failed to terminate thread pool gracefully."); + break; + } + } + if (!tpe.isTerminated()) { + tpe.shutdownNow(); + if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC, + TimeUnit.MILLISECONDS)) { + throw new RuntimeException( + "Failed to terminate thread pool in timely manner."); + } + } + tpe = null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff7c90a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java new file mode 100644 index 0000000..bd738b2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +/** + * Demonstrate that the threadpool blocks additional client requests if + * its queue is full (rather than throwing an exception) by initiating an + * upload consisting of 4 parts with 2 threads and 1 spot in the queue. The + * 4th part should not trigger an exception as it would with a + * non-blocking threadpool. + */ +public class TestS3ABlockingThreadPool { + + private Configuration conf; + private S3AFileSystem fs; + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + protected Path getTestPath() { + return new Path("/tests3a"); + } + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024); + conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024); + conf.setInt(Constants.MAX_THREADS, 2); + conf.setInt(Constants.MAX_TOTAL_TASKS, 1); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(getTestPath(), true); + } + } + + @Test + public void testRegularMultiPartUpload() throws Exception { + fs = S3ATestUtils.createTestFileSystem(conf); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 * + 1024); + } + + @Test + public void testFastMultiPartUpload() throws Exception { + conf.setBoolean(Constants.FAST_UPLOAD, true); + fs = S3ATestUtils.createTestFileSystem(conf); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 * + 1024); + + } +}