HADOOP-13560. S3ABlockOutputStream to support huge (many GB) file writes. Contributed by Steve Loughran
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6c348c56 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6c348c56 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6c348c56 Branch: refs/heads/trunk Commit: 6c348c56918973fd988b110e79231324a8befe12 Parents: b733a6f Author: Steve Loughran <ste...@apache.org> Authored: Tue Oct 18 19:33:38 2016 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Tue Oct 18 21:16:02 2016 +0100 ---------------------------------------------------------------------- .../src/main/resources/core-default.xml | 74 +- .../hadoop/fs/contract/ContractTestUtils.java | 16 +- hadoop-tools/hadoop-aws/pom.xml | 58 +- .../s3a/BlockingThreadPoolExecutorService.java | 168 +--- .../org/apache/hadoop/fs/s3a/Constants.java | 71 +- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 703 ++++++++++++++++ .../org/apache/hadoop/fs/s3a/S3ADataBlocks.java | 821 +++++++++++++++++++ .../hadoop/fs/s3a/S3AFastOutputStream.java | 410 --------- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 408 +++++++-- .../hadoop/fs/s3a/S3AInstrumentation.java | 248 +++++- .../apache/hadoop/fs/s3a/S3AOutputStream.java | 57 +- .../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 39 + .../fs/s3a/SemaphoredDelegatingExecutor.java | 230 ++++++ .../org/apache/hadoop/fs/s3a/Statistic.java | 32 +- .../src/site/markdown/tools/hadoop-aws/index.md | 668 +++++++++++++-- .../fs/contract/s3a/ITestS3AContractDistCp.java | 10 +- .../hadoop/fs/s3a/AbstractS3ATestBase.java | 1 + .../ITestBlockingThreadPoolExecutorService.java | 48 +- .../hadoop/fs/s3a/ITestS3ABlockOutputArray.java | 90 ++ .../fs/s3a/ITestS3ABlockOutputByteBuffer.java | 30 + .../hadoop/fs/s3a/ITestS3ABlockOutputDisk.java | 30 + .../fs/s3a/ITestS3ABlockingThreadPool.java | 2 + .../hadoop/fs/s3a/ITestS3AConfiguration.java | 29 + .../ITestS3AEncryptionBlockOutputStream.java | 36 + .../s3a/ITestS3AEncryptionFastOutputStream.java | 35 - .../hadoop/fs/s3a/ITestS3AFastOutputStream.java | 74 -- .../apache/hadoop/fs/s3a/ITestS3ATestUtils.java | 98 +++ .../apache/hadoop/fs/s3a/S3ATestConstants.java | 75 +- .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 148 +++- .../apache/hadoop/fs/s3a/TestDataBlocks.java | 124 +++ .../ITestS3AFileContextStatistics.java | 1 + .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 412 ++++++++++ .../fs/s3a/scale/ITestS3ADeleteManyFiles.java | 19 +- .../s3a/scale/ITestS3AHugeFilesArrayBlocks.java | 31 + .../ITestS3AHugeFilesByteBufferBlocks.java | 34 + .../scale/ITestS3AHugeFilesClassicOutput.java | 41 + .../s3a/scale/ITestS3AHugeFilesDiskBlocks.java | 31 + .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 151 ++-- 38 files changed, 4647 insertions(+), 906 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 4882728..daa421c 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -994,8 +994,8 @@ <property> <name>fs.s3a.threads.max</name> <value>10</value> - <description> Maximum number of concurrent active (part)uploads, - which each use a thread from the threadpool.</description> + <description>The total number of threads available in the filesystem for data + uploads *or any other queued filesystem operation*.</description> </property> <property> @@ -1008,8 +1008,7 @@ <property> <name>fs.s3a.max.total.tasks</name> <value>5</value> - <description>Number of (part)uploads allowed to the queue before - blocking additional uploads.</description> + <description>The number of operations which can be queued for execution</description> </property> <property> @@ -1047,13 +1046,21 @@ <name>fs.s3a.multipart.purge</name> <value>false</value> <description>True if you want to purge existing multipart uploads that may not have been - completed/aborted correctly</description> + completed/aborted correctly. The corresponding purge age is defined in + fs.s3a.multipart.purge.age. + If set, when the filesystem is instantiated then all outstanding uploads + older than the purge age will be terminated -across the entire bucket. + This will impact multipart uploads by other applications and users. so should + be used sparingly, with an age value chosen to stop failed uploads, without + breaking ongoing operations. + </description> </property> <property> <name>fs.s3a.multipart.purge.age</name> <value>86400</value> - <description>Minimum age in seconds of multipart uploads to purge</description> + <description>Minimum age in seconds of multipart uploads to purge. + </description> </property> <property> @@ -1086,10 +1093,50 @@ <property> <name>fs.s3a.fast.upload</name> <value>false</value> - <description>Upload directly from memory instead of buffering to - disk first. Memory usage and parallelism can be controlled as up to - fs.s3a.multipart.size memory is consumed for each (part)upload actively - uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description> + <description> + Use the incremental block-based fast upload mechanism with + the buffering mechanism set in fs.s3a.fast.upload.buffer. + </description> +</property> + +<property> + <name>fs.s3a.fast.upload.buffer</name> + <value>disk</value> + <description> + The buffering mechanism to use when using S3A fast upload + (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer. + This configuration option has no effect if fs.s3a.fast.upload is false. + + "disk" will use the directories listed in fs.s3a.buffer.dir as + the location(s) to save data prior to being uploaded. + + "array" uses arrays in the JVM heap + + "bytebuffer" uses off-heap memory within the JVM. + + Both "array" and "bytebuffer" will consume memory in a single stream up to the number + of blocks set by: + + fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks. + + If using either of these mechanisms, keep this value low + + The total number of threads performing work across all threads is set by + fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued + work items. + </description> +</property> + +<property> + <name>fs.s3a.fast.upload.active.blocks</name> + <value>4</value> + <description> + Maximum Number of blocks a single output stream can have + active (uploading, or queued to the central FileSystem + instance's pool of queued operations. + + This stops a single stream overloading the shared thread pool. + </description> </property> <property> @@ -1101,13 +1148,6 @@ </property> <property> - <name>fs.s3a.fast.buffer.size</name> - <value>1048576</value> - <description>Size of initial memory buffer in bytes allocated for an - upload. No effect if fs.s3a.fast.upload is false.</description> -</property> - -<property> <name>fs.s3a.user.agent.prefix</name> <value></value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 03f47c1..16bfb9a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -965,7 +965,7 @@ public class ContractTestUtils extends Assert { * @return the number of megabytes/second of the recorded operation */ public static double bandwidthMBs(long bytes, long durationNS) { - return (bytes * 1000.0) / durationNS; + return bytes / (1024.0 * 1024) * 1.0e9 / durationNS; } /** @@ -1415,6 +1415,14 @@ public class ContractTestUtils extends Assert { return endTime - startTime; } + /** + * Intermediate duration of the operation. + * @return how much time has passed since the start (in nanos). + */ + public long elapsedTime() { + return now() - startTime; + } + public double bandwidth(long bytes) { return bandwidthMBs(bytes, duration()); } @@ -1422,10 +1430,12 @@ public class ContractTestUtils extends Assert { /** * Bandwidth as bytes per second. * @param bytes bytes in - * @return the number of bytes per second this operation timed. + * @return the number of bytes per second this operation. + * 0 if duration == 0. */ public double bandwidthBytes(long bytes) { - return (bytes * 1.0) / duration(); + double duration = duration(); + return duration > 0 ? bytes / duration : 0; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 1c1bb02..1f9a6ff 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -35,6 +35,15 @@ <file.encoding>UTF-8</file.encoding> <downloadSources>true</downloadSources> <hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir> + + <!-- are scale tests enabled ? --> + <fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled> + <!-- Size in MB of huge files. --> + <fs.s3a.scale.test.huge.filesize>unset</fs.s3a.scale.test.huge.filesize> + <!-- Size in MB of the partion size in huge file uploads. --> + <fs.s3a.scale.test.huge.partitionsize>unset</fs.s3a.scale.test.huge.partitionsize> + <!-- Timeout in seconds for scale tests.--> + <fs.s3a.scale.test.timeout>3600</fs.s3a.scale.test.timeout> </properties> <profiles> @@ -115,6 +124,11 @@ <!-- substitution. Putting a prefix in front of it like --> <!-- "fork-" makes it work. --> <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id> + <!-- Propagate scale parameters --> + <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled> + <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> + <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize> + <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout> </systemPropertyVariables> </configuration> </plugin> @@ -132,7 +146,10 @@ <forkCount>${testsThreadCount}</forkCount> <reuseForks>false</reuseForks> <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine> + <forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds> <systemPropertyVariables> + <!-- Tell tests that they are being executed in parallel --> + <test.parallel.execution>true</test.parallel.execution> <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data> <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir> <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir> @@ -142,6 +159,11 @@ <!-- substitution. Putting a prefix in front of it like --> <!-- "fork-" makes it work. --> <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id> + <!-- Propagate scale parameters --> + <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled> + <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> + <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize> + <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout> </systemPropertyVariables> <!-- Some tests cannot run in parallel. Tests that cover --> <!-- access to the root directory must run in isolation --> @@ -160,10 +182,11 @@ <excludes> <exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude> <exclude>**/ITestS3ABlockingThreadPool.java</exclude> - <exclude>**/ITestS3AFastOutputStream.java</exclude> <exclude>**/ITestS3AFileSystemContract.java</exclude> <exclude>**/ITestS3AMiniYarnCluster.java</exclude> <exclude>**/ITest*Root*.java</exclude> + <exclude>**/ITestS3AFileContextStatistics.java</exclude> + <include>**/ITestS3AHuge*.java</include> </excludes> </configuration> </execution> @@ -174,6 +197,16 @@ <goal>verify</goal> </goals> <configuration> + <forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds> + <systemPropertyVariables> + <!-- Tell tests that they are being executed sequentially --> + <test.parallel.execution>false</test.parallel.execution> + <!-- Propagate scale parameters --> + <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled> + <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> + <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize> + <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout> + </systemPropertyVariables> <!-- Do a sequential run for tests that cannot handle --> <!-- parallel execution. --> <includes> @@ -183,6 +216,8 @@ <include>**/ITestS3AFileSystemContract.java</include> <include>**/ITestS3AMiniYarnCluster.java</include> <include>**/ITest*Root*.java</include> + <include>**/ITestS3AFileContextStatistics.java</include> + <include>**/ITestS3AHuge*.java</include> </includes> </configuration> </execution> @@ -210,7 +245,13 @@ <goal>verify</goal> </goals> <configuration> - <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds> + <systemPropertyVariables> + <!-- Propagate scale parameters --> + <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled> + <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize> + <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout> + </systemPropertyVariables> + <forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds> </configuration> </execution> </executions> @@ -218,6 +259,19 @@ </plugins> </build> </profile> + + <!-- Turn on scale tests--> + <profile> + <id>scale</id> + <activation> + <property> + <name>scale</name> + </property> + </activation> + <properties > + <fs.s3a.scale.test.enabled>true</fs.s3a.scale.test.enabled> + </properties> + </profile> </profiles> <build> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java index 597cce6..5ff96a5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java @@ -18,30 +18,21 @@ package org.apache.hadoop.fs.s3a; -import java.util.Collection; -import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.ForwardingListeningExecutorService; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.classification.InterfaceAudience; + /** * This ExecutorService blocks the submission of new tasks when its queue is * already full by using a semaphore. Task submissions require permits, task @@ -50,17 +41,17 @@ import com.google.common.util.concurrent.MoreExecutors; * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java"> * this s4 threadpool</a> */ -public class BlockingThreadPoolExecutorService - extends ForwardingListeningExecutorService { +@InterfaceAudience.Private +final class BlockingThreadPoolExecutorService + extends SemaphoredDelegatingExecutor { private static Logger LOG = LoggerFactory .getLogger(BlockingThreadPoolExecutorService.class); - private Semaphore queueingPermits; - private ListeningExecutorService executorDelegatee; - private static final AtomicInteger POOLNUMBER = new AtomicInteger(1); + private final ThreadPoolExecutor eventProcessingExecutor; + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each * created thread uniquely, @@ -69,7 +60,7 @@ public class BlockingThreadPoolExecutorService * @param prefix The prefix of every created Thread's name * @return a {@link java.util.concurrent.ThreadFactory} that names threads */ - public static ThreadFactory getNamedThreadFactory(final String prefix) { + static ThreadFactory getNamedThreadFactory(final String prefix) { SecurityManager s = System.getSecurityManager(); final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); @@ -113,6 +104,12 @@ public class BlockingThreadPoolExecutorService }; } + private BlockingThreadPoolExecutorService(int permitCount, + ThreadPoolExecutor eventProcessingExecutor) { + super(MoreExecutors.listeningDecorator(eventProcessingExecutor), + permitCount, false); + this.eventProcessingExecutor = eventProcessingExecutor; + } /** * A thread pool that that blocks clients submitting additional tasks if @@ -125,10 +122,12 @@ public class BlockingThreadPoolExecutorService * @param unit time unit * @param prefixName prefix of name for threads */ - public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks, - long keepAliveTime, TimeUnit unit, String prefixName) { - super(); - queueingPermits = new Semaphore(waitingTasks + activeTasks, false); + public static BlockingThreadPoolExecutorService newInstance( + int activeTasks, + int waitingTasks, + long keepAliveTime, TimeUnit unit, + String prefixName) { + /* Although we generally only expect up to waitingTasks tasks in the queue, we need to be able to buffer all tasks in case dequeueing is slower than enqueueing. */ @@ -147,126 +146,25 @@ public class BlockingThreadPoolExecutorService } }); eventProcessingExecutor.allowCoreThreadTimeOut(true); - executorDelegatee = - MoreExecutors.listeningDecorator(eventProcessingExecutor); - - } - - @Override - protected ListeningExecutorService delegate() { - return executorDelegatee; - } - - @Override - public <T> ListenableFuture<T> submit(Callable<T> task) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new CallableWithPermitRelease<T>(task)); - } - - @Override - public <T> ListenableFuture<T> submit(Runnable task, T result) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new RunnableWithPermitRelease(task), result); - } - - @Override - public ListenableFuture<?> submit(Runnable task) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new RunnableWithPermitRelease(task)); - } - - @Override - public void execute(Runnable command) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - super.execute(new RunnableWithPermitRelease(command)); + return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks, + eventProcessingExecutor); } /** - * Releases a permit after the task is executed. + * Get the actual number of active threads. + * @return the active thread count */ - class RunnableWithPermitRelease implements Runnable { - - private Runnable delegatee; - - public RunnableWithPermitRelease(Runnable delegatee) { - this.delegatee = delegatee; - } - - @Override - public void run() { - try { - delegatee.run(); - } finally { - queueingPermits.release(); - } - - } - } - - /** - * Releases a permit after the task is completed. - */ - class CallableWithPermitRelease<T> implements Callable<T> { - - private Callable<T> delegatee; - - public CallableWithPermitRelease(Callable<T> delegatee) { - this.delegatee = delegatee; - } - - @Override - public T call() throws Exception { - try { - return delegatee.call(); - } finally { - queueingPermits.release(); - } - } - - } - - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - throw new RuntimeException("Not implemented"); + int getActiveCount() { + return eventProcessingExecutor.getActiveCount(); } @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, - long timeout, TimeUnit unit) throws InterruptedException { - throw new RuntimeException("Not implemented"); + public String toString() { + final StringBuilder sb = new StringBuilder( + "BlockingThreadPoolExecutorService{"); + sb.append(super.toString()); + sb.append(", activeCount=").append(getActiveCount()); + sb.append('}'); + return sb.toString(); } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks) - throws InterruptedException, ExecutionException { - throw new RuntimeException("Not implemented"); - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, - TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throw new RuntimeException("Not implemented"); - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 64fd8e5..65df0bf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -35,6 +35,9 @@ public final class Constants { private Constants() { } + /** The minimum multipart size which S3 supports. */ + public static final int MULTIPART_MIN_SIZE = 5 * 1024 * 1024; + // s3 access key public static final String ACCESS_KEY = "fs.s3a.access.key"; @@ -124,14 +127,72 @@ public final class Constants { // comma separated list of directories public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; - // should we upload directly from memory rather than using a file buffer + // switch to the fast block-by-block upload mechanism public static final String FAST_UPLOAD = "fs.s3a.fast.upload"; public static final boolean DEFAULT_FAST_UPLOAD = false; //initial size of memory buffer for a fast upload + @Deprecated public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size"; public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB + /** + * What buffer to use. + * Default is {@link #FAST_UPLOAD_BUFFER_DISK} + * Value: {@value} + */ + @InterfaceStability.Unstable + public static final String FAST_UPLOAD_BUFFER = + "fs.s3a.fast.upload.buffer"; + + /** + * Buffer blocks to disk: {@value}. + * Capacity is limited to available disk space. + */ + + @InterfaceStability.Unstable + public static final String FAST_UPLOAD_BUFFER_DISK = "disk"; + + /** + * Use an in-memory array. Fast but will run of heap rapidly: {@value}. + */ + @InterfaceStability.Unstable + public static final String FAST_UPLOAD_BUFFER_ARRAY = "array"; + + /** + * Use a byte buffer. May be more memory efficient than the + * {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}. + */ + @InterfaceStability.Unstable + public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer"; + + /** + * Default buffer option: {@value}. + */ + @InterfaceStability.Unstable + public static final String DEFAULT_FAST_UPLOAD_BUFFER = + FAST_UPLOAD_BUFFER_DISK; + + /** + * Maximum Number of blocks a single output stream can have + * active (uploading, or queued to the central FileSystem + * instance's pool of queued operations. + * This stops a single stream overloading the shared thread pool. + * {@value} + * <p> + * Default is {@link #DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS} + */ + @InterfaceStability.Unstable + public static final String FAST_UPLOAD_ACTIVE_BLOCKS = + "fs.s3a.fast.upload.active.blocks"; + + /** + * Limit of queued block upload operations before writes + * block. Value: {@value} + */ + @InterfaceStability.Unstable + public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4; + // Private | PublicRead | PublicReadWrite | AuthenticatedRead | // LogDeliveryWrite | BucketOwnerRead | BucketOwnerFullControl public static final String CANNED_ACL = "fs.s3a.acl.default"; @@ -145,7 +206,7 @@ public final class Constants { // purge any multipart uploads older than this number of seconds public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age"; - public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400; + public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400; // s3 server-side encryption public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = @@ -215,4 +276,10 @@ public final class Constants { public static final Class<? extends S3ClientFactory> DEFAULT_S3_CLIENT_FACTORY_IMPL = S3ClientFactory.DefaultS3ClientFactory.class; + + /** + * Maximum number of partitions in a multipart upload: {@value}. + */ + @InterfaceAudience.Private + public static final int MAX_MULTIPART_COUNT = 10000; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java new file mode 100644 index 0000000..b66a23f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -0,0 +1,703 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressEventType; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.util.Progressable; + +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.*; + +/** + * Upload files/parts directly via different buffering mechanisms: + * including memory and disk. + * + * If the stream is closed and no update has started, then the upload + * is instead done as a single PUT operation. + * + * Unstable: statistics and error handling might evolve. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class S3ABlockOutputStream extends OutputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(S3ABlockOutputStream.class); + + /** Owner FileSystem. */ + private final S3AFileSystem fs; + + /** Object being uploaded. */ + private final String key; + + /** Size of all blocks. */ + private final int blockSize; + + /** Callback for progress. */ + private final ProgressListener progressListener; + private final ListeningExecutorService executorService; + + /** + * Retry policy for multipart commits; not all AWS SDK versions retry that. + */ + private final RetryPolicy retryPolicy = + RetryPolicies.retryUpToMaximumCountWithProportionalSleep( + 5, + 2000, + TimeUnit.MILLISECONDS); + /** + * Factory for blocks. + */ + private final S3ADataBlocks.BlockFactory blockFactory; + + /** Preallocated byte buffer for writing single characters. */ + private final byte[] singleCharWrite = new byte[1]; + + /** Multipart upload details; null means none started. */ + private MultiPartUpload multiPartUpload; + + /** Closed flag. */ + private final AtomicBoolean closed = new AtomicBoolean(false); + + /** Current data block. Null means none currently active */ + private S3ADataBlocks.DataBlock activeBlock; + + /** Count of blocks uploaded. */ + private long blockCount = 0; + + /** Statistics to build up. */ + private final S3AInstrumentation.OutputStreamStatistics statistics; + + /** + * Write operation helper; encapsulation of the filesystem operations. + */ + private final S3AFileSystem.WriteOperationHelper writeOperationHelper; + + /** + * An S3A output stream which uploads partitions in a separate pool of + * threads; different {@link S3ADataBlocks.BlockFactory} + * instances can control where data is buffered. + * + * @param fs S3AFilesystem + * @param key S3 object to work on. + * @param executorService the executor service to use to schedule work + * @param progress report progress in order to prevent timeouts. If + * this object implements {@code ProgressListener} then it will be + * directly wired up to the AWS client, so receive detailed progress + * information. + * @param blockSize size of a single block. + * @param blockFactory factory for creating stream destinations + * @param statistics stats for this stream + * @param writeOperationHelper state of the write operation. + * @throws IOException on any problem + */ + S3ABlockOutputStream(S3AFileSystem fs, + String key, + ExecutorService executorService, + Progressable progress, + long blockSize, + S3ADataBlocks.BlockFactory blockFactory, + S3AInstrumentation.OutputStreamStatistics statistics, + S3AFileSystem.WriteOperationHelper writeOperationHelper) + throws IOException { + this.fs = fs; + this.key = key; + this.blockFactory = blockFactory; + this.blockSize = (int) blockSize; + this.statistics = statistics; + this.writeOperationHelper = writeOperationHelper; + Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, + "Block size is too small: %d", blockSize); + this.executorService = MoreExecutors.listeningDecorator(executorService); + this.multiPartUpload = null; + this.progressListener = (progress instanceof ProgressListener) ? + (ProgressListener) progress + : new ProgressableListener(progress); + // create that first block. This guarantees that an open + close sequence + // writes a 0-byte entry. + createBlockIfNeeded(); + LOG.debug("Initialized S3ABlockOutputStream for {}" + + " output to {}", writeOperationHelper, activeBlock); + } + + /** + * Demand create a destination block. + * @return the active block; null if there isn't one. + * @throws IOException on any failure to create + */ + private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded() + throws IOException { + if (activeBlock == null) { + blockCount++; + if (blockCount>= Constants.MAX_MULTIPART_COUNT) { + LOG.error("Number of partitions in stream exceeds limit for S3: " + + + Constants.MAX_MULTIPART_COUNT + " write may fail."); + } + activeBlock = blockFactory.create(this.blockSize); + } + return activeBlock; + } + + /** + * Synchronized accessor to the active block. + * @return the active block; null if there isn't one. + */ + private synchronized S3ADataBlocks.DataBlock getActiveBlock() { + return activeBlock; + } + + /** + * Predicate to query whether or not there is an active block. + * @return true if there is an active block. + */ + private synchronized boolean hasActiveBlock() { + return activeBlock != null; + } + + /** + * Clear the active block. + */ + private void clearActiveBlock() { + LOG.debug("Clearing active block"); + synchronized (this) { + activeBlock = null; + } + } + + /** + * Check for the filesystem being open. + * @throws IOException if the filesystem is closed. + */ + void checkOpen() throws IOException { + if (closed.get()) { + throw new IOException("Filesystem " + writeOperationHelper + " closed"); + } + } + + /** + * The flush operation does not trigger an upload; that awaits + * the next block being full. What it does do is call {@code flush() } + * on the current block, leaving it to choose how to react. + * @throws IOException Any IO problem. + */ + @Override + public synchronized void flush() throws IOException { + checkOpen(); + S3ADataBlocks.DataBlock dataBlock = getActiveBlock(); + if (dataBlock != null) { + dataBlock.flush(); + } + } + + /** + * Writes a byte to the destination. If this causes the buffer to reach + * its limit, the actual upload is submitted to the threadpool. + * @param b the int of which the lowest byte is written + * @throws IOException on any problem + */ + @Override + public synchronized void write(int b) throws IOException { + singleCharWrite[0] = (byte)b; + write(singleCharWrite, 0, 1); + } + + /** + * Writes a range of bytes from to the memory buffer. If this causes the + * buffer to reach its limit, the actual upload is submitted to the + * threadpool and the remainder of the array is written to memory + * (recursively). + * @param source byte array containing + * @param offset offset in array where to start + * @param len number of bytes to be written + * @throws IOException on any problem + */ + @Override + public synchronized void write(byte[] source, int offset, int len) + throws IOException { + + S3ADataBlocks.validateWriteArgs(source, offset, len); + checkOpen(); + if (len == 0) { + return; + } + S3ADataBlocks.DataBlock block = createBlockIfNeeded(); + int written = block.write(source, offset, len); + int remainingCapacity = block.remainingCapacity(); + if (written < len) { + // not everything was written âthe block has run out + // of capacity + // Trigger an upload then process the remainder. + LOG.debug("writing more data than block has capacity -triggering upload"); + uploadCurrentBlock(); + // tail recursion is mildly expensive, but given buffer sizes must be MB. + // it's unlikely to recurse very deeply. + this.write(source, offset + written, len - written); + } else { + if (remainingCapacity == 0) { + // the whole buffer is done, trigger an upload + uploadCurrentBlock(); + } + } + } + + /** + * Start an asynchronous upload of the current block. + * @throws IOException Problems opening the destination for upload + * or initializing the upload. + */ + private synchronized void uploadCurrentBlock() throws IOException { + Preconditions.checkState(hasActiveBlock(), "No active block"); + LOG.debug("Writing block # {}", blockCount); + if (multiPartUpload == null) { + LOG.debug("Initiating Multipart upload"); + multiPartUpload = new MultiPartUpload(); + } + try { + multiPartUpload.uploadBlockAsync(getActiveBlock()); + } finally { + // set the block to null, so the next write will create a new block. + clearActiveBlock(); + } + } + + /** + * Close the stream. + * + * This will not return until the upload is complete + * or the attempt to perform the upload has failed. + * Exceptions raised in this method are indicative that the write has + * failed and data is at risk of being lost. + * @throws IOException on any failure. + */ + @Override + public void close() throws IOException { + if (closed.getAndSet(true)) { + // already closed + LOG.debug("Ignoring close() as stream is already closed"); + return; + } + S3ADataBlocks.DataBlock block = getActiveBlock(); + boolean hasBlock = hasActiveBlock(); + LOG.debug("{}: Closing block #{}: current block= {}", + this, + blockCount, + hasBlock ? block : "(none)"); + try { + if (multiPartUpload == null) { + if (hasBlock) { + // no uploads of data have taken place, put the single block up. + // This must happen even if there is no data, so that 0 byte files + // are created. + putObject(); + } + } else { + // there has already been at least one block scheduled for upload; + // put up the current then wait + if (hasBlock && block.hasData()) { + //send last part + uploadCurrentBlock(); + } + // wait for the partial uploads to finish + final List<PartETag> partETags = + multiPartUpload.waitForAllPartUploads(); + // then complete the operation + multiPartUpload.complete(partETags); + } + LOG.debug("Upload complete for {}", writeOperationHelper); + } catch (IOException ioe) { + writeOperationHelper.writeFailed(ioe); + throw ioe; + } finally { + LOG.debug("Closing block and factory"); + IOUtils.closeStream(block); + IOUtils.closeStream(blockFactory); + LOG.debug("Statistics: {}", statistics); + IOUtils.closeStream(statistics); + clearActiveBlock(); + } + // All end of write operations, including deleting fake parent directories + writeOperationHelper.writeSuccessful(); + } + + /** + * Upload the current block as a single PUT request; if the buffer + * is empty a 0-byte PUT will be invoked, as it is needed to create an + * entry at the far end. + * @throws IOException any problem. + */ + private void putObject() throws IOException { + LOG.debug("Executing regular upload for {}", writeOperationHelper); + + final S3ADataBlocks.DataBlock block = getActiveBlock(); + int size = block.dataSize(); + final PutObjectRequest putObjectRequest = + writeOperationHelper.newPutRequest( + block.startUpload(), + size); + long transferQueueTime = now(); + BlockUploadProgress callback = + new BlockUploadProgress( + block, progressListener, transferQueueTime); + putObjectRequest.setGeneralProgressListener(callback); + statistics.blockUploadQueued(size); + ListenableFuture<PutObjectResult> putObjectResult = + executorService.submit(new Callable<PutObjectResult>() { + @Override + public PutObjectResult call() throws Exception { + PutObjectResult result = fs.putObjectDirect(putObjectRequest); + block.close(); + return result; + } + }); + clearActiveBlock(); + //wait for completion + try { + putObjectResult.get(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted object upload", ie); + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + throw extractException("regular upload", key, ee); + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "S3ABlockOutputStream{"); + sb.append(writeOperationHelper.toString()); + sb.append(", blockSize=").append(blockSize); + // unsynced access; risks consistency in exchange for no risk of deadlock. + S3ADataBlocks.DataBlock block = activeBlock; + if (block != null) { + sb.append(", activeBlock=").append(block); + } + sb.append('}'); + return sb.toString(); + } + + private void incrementWriteOperations() { + fs.incrementWriteOperations(); + } + + /** + * Current time in milliseconds. + * @return time + */ + private long now() { + return System.currentTimeMillis(); + } + + /** + * Multiple partition upload. + */ + private class MultiPartUpload { + private final String uploadId; + private final List<ListenableFuture<PartETag>> partETagsFutures; + + public MultiPartUpload() throws IOException { + this.uploadId = writeOperationHelper.initiateMultiPartUpload(); + this.partETagsFutures = new ArrayList<>(2); + LOG.debug("Initiated multi-part upload for {} with " + + "id '{}'", writeOperationHelper, uploadId); + } + + /** + * Upload a block of data. + * This will take the block + * @param block block to upload + * @throws IOException upload failure + */ + private void uploadBlockAsync(final S3ADataBlocks.DataBlock block) + throws IOException { + LOG.debug("Queueing upload of {}", block); + final int size = block.dataSize(); + final InputStream uploadStream = block.startUpload(); + final int currentPartNumber = partETagsFutures.size() + 1; + final UploadPartRequest request = + writeOperationHelper.newUploadPartRequest( + uploadId, + uploadStream, + currentPartNumber, + size); + long transferQueueTime = now(); + BlockUploadProgress callback = + new BlockUploadProgress( + block, progressListener, transferQueueTime); + request.setGeneralProgressListener(callback); + statistics.blockUploadQueued(block.dataSize()); + ListenableFuture<PartETag> partETagFuture = + executorService.submit(new Callable<PartETag>() { + @Override + public PartETag call() throws Exception { + // this is the queued upload operation + LOG.debug("Uploading part {} for id '{}'", currentPartNumber, + uploadId); + // do the upload + PartETag partETag = fs.uploadPart(request).getPartETag(); + LOG.debug("Completed upload of {}", block); + LOG.debug("Stream statistics of {}", statistics); + + // close the block + block.close(); + return partETag; + } + }); + partETagsFutures.add(partETagFuture); + } + + /** + * Block awaiting all outstanding uploads to complete. + * @return list of results + * @throws IOException IO Problems + */ + private List<PartETag> waitForAllPartUploads() throws IOException { + LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size()); + try { + return Futures.allAsList(partETagsFutures).get(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted partUpload", ie); + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException ee) { + //there is no way of recovering so abort + //cancel all partUploads + LOG.debug("While waiting for upload completion", ee); + LOG.debug("Cancelling futures"); + for (ListenableFuture<PartETag> future : partETagsFutures) { + future.cancel(true); + } + //abort multipartupload + this.abort(); + throw extractException("Multi-part upload with id '" + uploadId + + "' to " + key, key, ee); + } + } + + /** + * This completes a multipart upload. + * Sometimes it fails; here retries are handled to avoid losing all data + * on a transient failure. + * @param partETags list of partial uploads + * @throws IOException on any problem + */ + private CompleteMultipartUploadResult complete(List<PartETag> partETags) + throws IOException { + int retryCount = 0; + AmazonClientException lastException; + String operation = + String.format("Completing multi-part upload for key '%s'," + + " id '%s' with %s partitions ", + key, uploadId, partETags.size()); + do { + try { + LOG.debug(operation); + return writeOperationHelper.completeMultipartUpload( + uploadId, + partETags); + } catch (AmazonClientException e) { + lastException = e; + statistics.exceptionInMultipartComplete(); + } + } while (shouldRetry(operation, lastException, retryCount++)); + // this point is only reached if the operation failed more than + // the allowed retry count + throw translateException(operation, key, lastException); + } + + /** + * Abort a multi-part upload. Retries are attempted on failures. + * IOExceptions are caught; this is expected to be run as a cleanup process. + */ + public void abort() { + int retryCount = 0; + AmazonClientException lastException; + fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED); + String operation = + String.format("Aborting multi-part upload for '%s', id '%s", + writeOperationHelper, uploadId); + do { + try { + LOG.debug(operation); + writeOperationHelper.abortMultipartUpload(uploadId); + return; + } catch (AmazonClientException e) { + lastException = e; + statistics.exceptionInMultipartAbort(); + } + } while (shouldRetry(operation, lastException, retryCount++)); + // this point is only reached if the operation failed more than + // the allowed retry count + LOG.warn("Unable to abort multipart upload, you may need to purge " + + "uploaded parts", lastException); + } + + /** + * Predicate to determine whether a failed operation should + * be attempted again. + * If a retry is advised, the exception is automatically logged and + * the filesystem statistic {@link Statistic#IGNORED_ERRORS} incremented. + * The method then sleeps for the sleep time suggested by the sleep policy; + * if the sleep is interrupted then {@code Thread.interrupted()} is set + * to indicate the thread was interrupted; then false is returned. + * + * @param operation operation for log message + * @param e exception raised. + * @param retryCount number of retries already attempted + * @return true if another attempt should be made + */ + private boolean shouldRetry(String operation, + AmazonClientException e, + int retryCount) { + try { + RetryPolicy.RetryAction retryAction = + retryPolicy.shouldRetry(e, retryCount, 0, true); + boolean retry = retryAction == RetryPolicy.RetryAction.RETRY; + if (retry) { + fs.incrementStatistic(IGNORED_ERRORS); + LOG.info("Retrying {} after exception ", operation, e); + Thread.sleep(retryAction.delayMillis); + } + return retry; + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return false; + } catch (Exception ignored) { + return false; + } + } + + } + + /** + * The upload progress listener registered for events returned + * during the upload of a single block. + * It updates statistics and handles the end of the upload. + * Transfer failures are logged at WARN. + */ + private final class BlockUploadProgress implements ProgressListener { + private final S3ADataBlocks.DataBlock block; + private final ProgressListener nextListener; + private final long transferQueueTime; + private long transferStartTime; + + /** + * Track the progress of a single block upload. + * @param block block to monitor + * @param nextListener optional next progress listener + * @param transferQueueTime time the block was transferred + * into the queue + */ + private BlockUploadProgress(S3ADataBlocks.DataBlock block, + ProgressListener nextListener, + long transferQueueTime) { + this.block = block; + this.transferQueueTime = transferQueueTime; + this.nextListener = nextListener; + } + + @Override + public void progressChanged(ProgressEvent progressEvent) { + ProgressEventType eventType = progressEvent.getEventType(); + long bytesTransferred = progressEvent.getBytesTransferred(); + + int size = block.dataSize(); + switch (eventType) { + + case REQUEST_BYTE_TRANSFER_EVENT: + // bytes uploaded + statistics.bytesTransferred(bytesTransferred); + break; + + case TRANSFER_PART_STARTED_EVENT: + transferStartTime = now(); + statistics.blockUploadStarted(transferStartTime - transferQueueTime, + size); + incrementWriteOperations(); + break; + + case TRANSFER_PART_COMPLETED_EVENT: + statistics.blockUploadCompleted(now() - transferStartTime, size); + break; + + case TRANSFER_PART_FAILED_EVENT: + statistics.blockUploadFailed(now() - transferStartTime, size); + LOG.warn("Transfer failure of block {}", block); + break; + + default: + // nothing + } + + if (nextListener != null) { + nextListener.progressChanged(progressEvent); + } + } + } + + /** + * Bridge from AWS {@code ProgressListener} to Hadoop {@link Progressable}. + */ + private static class ProgressableListener implements ProgressListener { + private final Progressable progress; + + public ProgressableListener(Progressable progress) { + this.progress = progress; + } + + public void progressChanged(ProgressEvent progressEvent) { + if (progress != null) { + progress.progress(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java new file mode 100644 index 0000000..0fe2af7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -0,0 +1,821 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.util.DirectBufferPool; + +import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*; + +/** + * Set of classes to support output streaming into blocks which are then + * uploaded as partitions. + */ +final class S3ADataBlocks { + + private static final Logger LOG = + LoggerFactory.getLogger(S3ADataBlocks.class); + + private S3ADataBlocks() { + } + + /** + * Validate args to a write command. These are the same validation checks + * expected for any implementation of {@code OutputStream.write()}. + * @param b byte array containing data + * @param off offset in array where to start + * @param len number of bytes to be written + * @throws NullPointerException for a null buffer + * @throws IndexOutOfBoundsException if indices are out of range + */ + static void validateWriteArgs(byte[] b, int off, int len) + throws IOException { + Preconditions.checkNotNull(b); + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException( + "write (b[" + b.length + "], " + off + ", " + len + ')'); + } + } + + /** + * Create a factory. + * @param owner factory owner + * @param name factory name -the option from {@link Constants}. + * @return the factory, ready to be initialized. + * @throws IllegalArgumentException if the name is unknown. + */ + static BlockFactory createFactory(S3AFileSystem owner, + String name) { + switch (name) { + case Constants.FAST_UPLOAD_BUFFER_ARRAY: + return new ArrayBlockFactory(owner); + case Constants.FAST_UPLOAD_BUFFER_DISK: + return new DiskBlockFactory(owner); + case Constants.FAST_UPLOAD_BYTEBUFFER: + return new ByteBufferBlockFactory(owner); + default: + throw new IllegalArgumentException("Unsupported block buffer" + + " \"" + name + '"'); + } + } + + /** + * Base class for block factories. + */ + static abstract class BlockFactory implements Closeable { + + private final S3AFileSystem owner; + + protected BlockFactory(S3AFileSystem owner) { + this.owner = owner; + } + + + /** + * Create a block. + * @param limit limit of the block. + * @return a new block. + */ + abstract DataBlock create(int limit) throws IOException; + + /** + * Implement any close/cleanup operation. + * Base class is a no-op + * @throws IOException -ideally, it shouldn't. + */ + @Override + public void close() throws IOException { + } + + /** + * Owner. + */ + protected S3AFileSystem getOwner() { + return owner; + } + } + + /** + * This represents a block being uploaded. + */ + static abstract class DataBlock implements Closeable { + + enum DestState {Writing, Upload, Closed} + + private volatile DestState state = Writing; + + /** + * Atomically enter a state, verifying current state. + * @param current current state. null means "no check" + * @param next next state + * @throws IllegalStateException if the current state is not as expected + */ + protected synchronized final void enterState(DestState current, + DestState next) + throws IllegalStateException { + verifyState(current); + LOG.debug("{}: entering state {}", this, next); + state = next; + } + + /** + * Verify that the block is in the declared state. + * @param expected expected state. + * @throws IllegalStateException if the DataBlock is in the wrong state + */ + protected final void verifyState(DestState expected) + throws IllegalStateException { + if (expected != null && state != expected) { + throw new IllegalStateException("Expected stream state " + expected + + " -but actual state is " + state + " in " + this); + } + } + + /** + * Current state. + * @return the current state. + */ + final DestState getState() { + return state; + } + + /** + * Return the current data size. + * @return the size of the data + */ + abstract int dataSize(); + + /** + * Predicate to verify that the block has the capacity to write + * the given set of bytes. + * @param bytes number of bytes desired to be written. + * @return true if there is enough space. + */ + abstract boolean hasCapacity(long bytes); + + /** + * Predicate to check if there is data in the block. + * @return true if there is + */ + boolean hasData() { + return dataSize() > 0; + } + + /** + * The remaining capacity in the block before it is full. + * @return the number of bytes remaining. + */ + abstract int remainingCapacity(); + + /** + * Write a series of bytes from the buffer, from the offset. + * Returns the number of bytes written. + * Only valid in the state {@code Writing}. + * Base class verifies the state but does no writing. + * @param buffer buffer + * @param offset offset + * @param length length of write + * @return number of bytes written + * @throws IOException trouble + */ + int write(byte[] buffer, int offset, int length) throws IOException { + verifyState(Writing); + Preconditions.checkArgument(buffer != null, "Null buffer"); + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(offset >= 0, "offset is negative"); + Preconditions.checkArgument( + !(buffer.length - offset < length), + "buffer shorter than amount of data to write"); + return 0; + } + + /** + * Flush the output. + * Only valid in the state {@code Writing}. + * In the base class, this is a no-op + * @throws IOException any IO problem. + */ + void flush() throws IOException { + verifyState(Writing); + } + + /** + * Switch to the upload state and return a stream for uploading. + * Base class calls {@link #enterState(DestState, DestState)} to + * manage the state machine. + * @return the stream + * @throws IOException trouble + */ + InputStream startUpload() throws IOException { + LOG.debug("Start datablock upload"); + enterState(Writing, Upload); + return null; + } + + /** + * Enter the closed state. + * @return true if the class was in any other state, implying that + * the subclass should do its close operations + */ + protected synchronized boolean enterClosedState() { + if (!state.equals(Closed)) { + enterState(null, Closed); + return true; + } else { + return false; + } + } + + @Override + public void close() throws IOException { + if (enterClosedState()) { + LOG.debug("Closed {}", this); + innerClose(); + } + } + + /** + * Inner close logic for subclasses to implement. + */ + protected void innerClose() throws IOException { + + } + + } + + // ==================================================================== + + /** + * Use byte arrays on the heap for storage. + */ + static class ArrayBlockFactory extends BlockFactory { + + ArrayBlockFactory(S3AFileSystem owner) { + super(owner); + } + + @Override + DataBlock create(int limit) throws IOException { + return new ByteArrayBlock(limit); + } + + } + + /** + * Stream to memory via a {@code ByteArrayOutputStream}. + * + * This was taken from {@code S3AFastOutputStream} and has the + * same problem which surfaced there: it can consume a lot of heap space + * proportional to the mismatch between writes to the stream and + * the JVM-wide upload bandwidth to the S3 endpoint. + * The memory consumption can be limited by tuning the filesystem settings + * to restrict the number of queued/active uploads. + */ + + static class ByteArrayBlock extends DataBlock { + private ByteArrayOutputStream buffer; + private final int limit; + // cache data size so that it is consistent after the buffer is reset. + private Integer dataSize; + + ByteArrayBlock(int limit) { + this.limit = limit; + buffer = new ByteArrayOutputStream(); + } + + /** + * Get the amount of data; if there is no buffer then the size is 0. + * @return the amount of data available to upload. + */ + @Override + int dataSize() { + return dataSize != null ? dataSize : buffer.size(); + } + + @Override + InputStream startUpload() throws IOException { + super.startUpload(); + dataSize = buffer.size(); + ByteArrayInputStream bufferData = new ByteArrayInputStream( + buffer.toByteArray()); + buffer = null; + return bufferData; + } + + @Override + boolean hasCapacity(long bytes) { + return dataSize() + bytes <= limit; + } + + @Override + int remainingCapacity() { + return limit - dataSize(); + } + + @Override + int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + buffer.write(b, offset, written); + return written; + } + + @Override + protected void innerClose() { + buffer = null; + } + + @Override + public String toString() { + return "ByteArrayBlock{" + + "state=" + getState() + + ", limit=" + limit + + ", dataSize=" + dataSize + + '}'; + } + } + + // ==================================================================== + + /** + * Stream via Direct ByteBuffers; these are allocated off heap + * via {@link DirectBufferPool}. + * This is actually the most complex of all the block factories, + * due to the need to explicitly recycle buffers; in comparison, the + * {@link DiskBlock} buffer delegates the work of deleting files to + * the {@link DiskBlock.FileDeletingInputStream}. Here the + * input stream {@link ByteBufferInputStream} has a similar task, along + * with the foundational work of streaming data from a byte array. + */ + + static class ByteBufferBlockFactory extends BlockFactory { + + private final DirectBufferPool bufferPool = new DirectBufferPool(); + private final AtomicInteger buffersOutstanding = new AtomicInteger(0); + + ByteBufferBlockFactory(S3AFileSystem owner) { + super(owner); + } + + @Override + ByteBufferBlock create(int limit) throws IOException { + return new ByteBufferBlock(limit); + } + + private ByteBuffer requestBuffer(int limit) { + LOG.debug("Requesting buffer of size {}", limit); + buffersOutstanding.incrementAndGet(); + return bufferPool.getBuffer(limit); + } + + private void releaseBuffer(ByteBuffer buffer) { + LOG.debug("Releasing buffer"); + bufferPool.returnBuffer(buffer); + buffersOutstanding.decrementAndGet(); + } + + /** + * Get count of outstanding buffers. + * @return the current buffer count + */ + public int getOutstandingBufferCount() { + return buffersOutstanding.get(); + } + + @Override + public String toString() { + return "ByteBufferBlockFactory{" + + "buffersOutstanding=" + buffersOutstanding + + '}'; + } + + /** + * A DataBlock which requests a buffer from pool on creation; returns + * it when the output stream is closed. + */ + class ByteBufferBlock extends DataBlock { + private ByteBuffer buffer; + private final int bufferSize; + // cache data size so that it is consistent after the buffer is reset. + private Integer dataSize; + + /** + * Instantiate. This will request a ByteBuffer of the desired size. + * @param bufferSize buffer size + */ + ByteBufferBlock(int bufferSize) { + this.bufferSize = bufferSize; + buffer = requestBuffer(bufferSize); + } + + /** + * Get the amount of data; if there is no buffer then the size is 0. + * @return the amount of data available to upload. + */ + @Override + int dataSize() { + return dataSize != null ? dataSize : bufferCapacityUsed(); + } + + @Override + ByteBufferInputStream startUpload() throws IOException { + super.startUpload(); + dataSize = bufferCapacityUsed(); + // set the buffer up from reading from the beginning + buffer.limit(buffer.position()); + buffer.position(0); + return new ByteBufferInputStream(dataSize, buffer); + } + + @Override + public boolean hasCapacity(long bytes) { + return bytes <= remainingCapacity(); + } + + @Override + public int remainingCapacity() { + return buffer != null ? buffer.remaining() : 0; + } + + private int bufferCapacityUsed() { + return buffer.capacity() - buffer.remaining(); + } + + @Override + int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + buffer.put(b, offset, written); + return written; + } + + @Override + protected void innerClose() { + buffer = null; + } + + @Override + public String toString() { + return "ByteBufferBlock{" + + "state=" + getState() + + ", dataSize=" + dataSize() + + ", limit=" + bufferSize + + ", remainingCapacity=" + remainingCapacity() + + '}'; + } + + } + + /** + * Provide an input stream from a byte buffer; supporting + * {@link #mark(int)}, which is required to enable replay of failed + * PUT attempts. + * This input stream returns the buffer to the pool afterwards. + */ + class ByteBufferInputStream extends InputStream { + + private final int size; + private ByteBuffer byteBuffer; + + ByteBufferInputStream(int size, ByteBuffer byteBuffer) { + LOG.debug("Creating ByteBufferInputStream of size {}", size); + this.size = size; + this.byteBuffer = byteBuffer; + } + + /** + * Return the buffer to the pool after the stream is closed. + */ + @Override + public synchronized void close() { + if (byteBuffer != null) { + LOG.debug("releasing buffer"); + releaseBuffer(byteBuffer); + byteBuffer = null; + } + } + + /** + * Verify that the stream is open. + * @throws IOException if the stream is closed + */ + private void verifyOpen() throws IOException { + if (byteBuffer == null) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + + public synchronized int read() throws IOException { + if (available() > 0) { + return byteBuffer.get() & 0xFF; + } else { + return -1; + } + } + + @Override + public synchronized long skip(long offset) throws IOException { + verifyOpen(); + long newPos = position() + offset; + if (newPos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + if (newPos > size) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } + byteBuffer.position((int) newPos); + return newPos; + } + + @Override + public synchronized int available() { + Preconditions.checkState(byteBuffer != null, + FSExceptionMessages.STREAM_IS_CLOSED); + return byteBuffer.remaining(); + } + + /** + * Get the current buffer position. + * @return the buffer position + */ + public synchronized int position() { + return byteBuffer.position(); + } + + /** + * Check if there is data left. + * @return true if there is data remaining in the buffer. + */ + public synchronized boolean hasRemaining() { + return byteBuffer.hasRemaining(); + } + + @Override + public synchronized void mark(int readlimit) { + LOG.debug("mark at {}", position()); + byteBuffer.mark(); + } + + @Override + public synchronized void reset() throws IOException { + LOG.debug("reset"); + byteBuffer.reset(); + } + + @Override + public boolean markSupported() { + return true; + } + + /** + * Read in data. + * @param buffer destination buffer + * @param offset offset within the buffer + * @param length length of bytes to read + * @throws EOFException if the position is negative + * @throws IndexOutOfBoundsException if there isn't space for the + * amount of data requested. + * @throws IllegalArgumentException other arguments are invalid. + */ + @SuppressWarnings("NullableProblems") + public synchronized int read(byte[] buffer, int offset, int length) + throws IOException { + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(buffer != null, "Null buffer"); + if (buffer.length - offset < length) { + throw new IndexOutOfBoundsException( + FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER + + ": request length =" + length + + ", with offset =" + offset + + "; buffer capacity =" + (buffer.length - offset)); + } + verifyOpen(); + if (!hasRemaining()) { + return -1; + } + + int toRead = Math.min(length, available()); + byteBuffer.get(buffer, offset, toRead); + return toRead; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "ByteBufferInputStream{"); + sb.append("size=").append(size); + ByteBuffer buffer = this.byteBuffer; + if (buffer != null) { + sb.append(", available=").append(buffer.remaining()); + } + sb.append('}'); + return sb.toString(); + } + } + } + + // ==================================================================== + + /** + * Buffer blocks to disk. + */ + static class DiskBlockFactory extends BlockFactory { + + DiskBlockFactory(S3AFileSystem owner) { + super(owner); + } + + /** + * Create a temp file and a block which writes to it. + * @param limit limit of the block. + * @return the new block + * @throws IOException IO problems + */ + @Override + DataBlock create(int limit) throws IOException { + File destFile = getOwner() + .createTmpFileForWrite("s3ablock", limit, getOwner().getConf()); + return new DiskBlock(destFile, limit); + } + } + + /** + * Stream to a file. + * This will stop at the limit; the caller is expected to create a new block + */ + static class DiskBlock extends DataBlock { + + private int bytesWritten; + private final File bufferFile; + private final int limit; + private BufferedOutputStream out; + private InputStream uploadStream; + + DiskBlock(File bufferFile, int limit) + throws FileNotFoundException { + this.limit = limit; + this.bufferFile = bufferFile; + out = new BufferedOutputStream(new FileOutputStream(bufferFile)); + } + + @Override + int dataSize() { + return bytesWritten; + } + + @Override + boolean hasCapacity(long bytes) { + return dataSize() + bytes <= limit; + } + + @Override + int remainingCapacity() { + return limit - bytesWritten; + } + + @Override + int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + out.write(b, offset, written); + bytesWritten += written; + return written; + } + + @Override + InputStream startUpload() throws IOException { + super.startUpload(); + try { + out.flush(); + } finally { + out.close(); + out = null; + } + uploadStream = new FileInputStream(bufferFile); + return new FileDeletingInputStream(uploadStream); + } + + /** + * The close operation will delete the destination file if it still + * exists. + * @throws IOException IO problems + */ + @Override + protected void innerClose() throws IOException { + final DestState state = getState(); + LOG.debug("Closing {}", this); + switch (state) { + case Writing: + if (bufferFile.exists()) { + // file was not uploaded + LOG.debug("Deleting buffer file as upload did not start"); + boolean deleted = bufferFile.delete(); + if (!deleted && bufferFile.exists()) { + LOG.warn("Failed to delete buffer file {}", bufferFile); + } + } + break; + + case Upload: + LOG.debug("Buffer file {} exists âclose upload stream", bufferFile); + break; + + case Closed: + // no-op + break; + + default: + // this state can never be reached, but checkstyle complains, so + // it is here. + } + } + + /** + * Flush operation will flush to disk. + * @throws IOException IOE raised on FileOutputStream + */ + @Override + void flush() throws IOException { + super.flush(); + out.flush(); + } + + @Override + public String toString() { + String sb = "FileBlock{" + + "destFile=" + bufferFile + + ", state=" + getState() + + ", dataSize=" + dataSize() + + ", limit=" + limit + + '}'; + return sb; + } + + /** + * An input stream which deletes the buffer file when closed. + */ + private final class FileDeletingInputStream extends FilterInputStream { + private final AtomicBoolean closed = new AtomicBoolean(false); + + FileDeletingInputStream(InputStream source) { + super(source); + } + + /** + * Delete the input file when closed. + * @throws IOException IO problem + */ + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + if (!closed.getAndSet(true)) { + if (!bufferFile.delete()) { + LOG.warn("delete({}) returned false", + bufferFile.getAbsoluteFile()); + } + } + } + } + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org