HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/15b7076a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/15b7076a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/15b7076a

Branch: refs/heads/trunk
Commit: 15b7076ad5f2ae92d231140b2f8cebc392a92c87
Parents: e17e5ba
Author: Steve Loughran <ste...@apache.org>
Authored: Tue Mar 3 16:18:39 2015 -0800
Committer: Steve Loughran <ste...@apache.org>
Committed: Tue Mar 3 16:18:51 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   2 +
 .../src/main/resources/core-default.xml         |  20 +-
 .../org/apache/hadoop/fs/s3a/Constants.java     |   8 +
 .../hadoop/fs/s3a/S3AFastOutputStream.java      | 413 +++++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  24 +-
 .../src/site/markdown/tools/hadoop-aws/index.md |  46 ++-
 .../hadoop/fs/s3a/TestS3AFastOutputStream.java  |  74 ++++
 7 files changed, 570 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 11785f2..cb5cd4d 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -667,6 +667,8 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11620. Add support for load balancing across a group of KMS for HA.
     (Arun Suresh via wang)
 
+    HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel)
+
   BUG FIXES
 
     HADOOP-11512. Use getTrimmedStrings when reading serialization keys

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 80dd15b..74390d8 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -763,13 +763,13 @@ for ldap providers in the same way as above does.
 <property>
   <name>fs.s3a.connection.establish.timeout</name>
   <value>5000</value>
-  <description>Socket connection setup timeout in seconds.</description>
+  <description>Socket connection setup timeout in milliseconds.</description>
 </property>
 
 <property>
   <name>fs.s3a.connection.timeout</name>
   <value>50000</value>
-  <description>Socket connection timeout in seconds.</description>
+  <description>Socket connection timeout in milliseconds.</description>
 </property>
 
 <property>
@@ -846,6 +846,22 @@ for ldap providers in the same way as above does.
 </property>
 
 <property>
+  <name>fs.s3a.fast.upload</name>
+  <value>false</value>
+  <description>Upload directly from memory instead of buffering to
+    disk first. Memory usage and parallelism can be controlled as up to
+    fs.s3a.multipart.size memory is consumed for each (part)upload actively
+    uploading (fs.s3a.threads.max) or queueing 
(fs.s3a.max.total.tasks)</description>
+</property>
+
+  <property>
+  <name>fs.s3a.fast.buffer.size</name>
+  <value>1048576</value>
+  <description>Size of initial memory buffer in bytes allocated for an
+    upload. No effect if fs.s3a.fast.upload is false.</description>
+</property>
+
+<property>
   <name>fs.s3a.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
   <description>The implementation class of the S3A Filesystem</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 1d4f67b..e7462dc 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -83,6 +83,14 @@ public class Constants {
   // comma separated list of directories
   public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
 
+  // should we upload directly from memory rather than using a file buffer
+  public static final String FAST_UPLOAD = "fs.s3a.fast.upload";
+  public static final boolean DEFAULT_FAST_UPLOAD = false;
+
+  //initial size of memory buffer for a fast upload
+  public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size";
+  public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB
+
   // private | public-read | public-read-write | authenticated-read | 
   // log-delivery-write | bucket-owner-read | bucket-owner-full-control
   public static final String CANNED_ACL = "fs.s3a.acl.default";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
new file mode 100644
index 0000000..a29c47b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+
+/**
+ * Upload files/parts asap directly from a memory buffer (instead of buffering
+ * to a file).
+ * <p/>
+ * Uploads are managed low-level rather than through the AWS TransferManager.
+ * This allows for uploading each part of a multi-part upload as soon as
+ * the bytes are in memory, rather than waiting until the file is closed.
+ * <p/>
+ * Unstable: statistics and error handling might evolve
+ */
+@InterfaceStability.Unstable
+public class S3AFastOutputStream extends OutputStream {
+
+  private static final Logger LOG = S3AFileSystem.LOG;
+  private final String key;
+  private final String bucket;
+  private final AmazonS3Client client;
+  private final int partSize;
+  private final int multiPartThreshold;
+  private final S3AFileSystem fs;
+  private final CannedAccessControlList cannedACL;
+  private final FileSystem.Statistics statistics;
+  private final String serverSideEncryptionAlgorithm;
+  private final ProgressListener progressListener;
+  private final ListeningExecutorService executorService;
+  private MultiPartUpload multiPartUpload;
+  private boolean closed;
+  private ByteArrayOutputStream buffer;
+  private int bufferLimit;
+
+
+  /**
+   * Creates a fast OutputStream that uploads to S3 from memory.
+   * For MultiPartUploads, as soon as sufficient bytes have been written to
+   * the stream a part is uploaded immediately (by using the low-level
+   * multi-part upload API on the AmazonS3Client).
+   *
+   * @param client AmazonS3Client used for S3 calls
+   * @param fs S3AFilesystem
+   * @param bucket S3 bucket name
+   * @param key S3 key name
+   * @param progress report progress in order to prevent timeouts
+   * @param statistics track FileSystem.Statistics on the performed operations
+   * @param cannedACL used CannedAccessControlList
+   * @param serverSideEncryptionAlgorithm algorithm for server side encryption
+   * @param partSize size of a single part in a multi-part upload (except
+   * last part)
+   * @param multiPartThreshold files at least this size use multi-part upload
+   * @throws IOException
+   */
+  public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
+      String bucket, String key, Progressable progress,
+      FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
+      String serverSideEncryptionAlgorithm, long partSize,
+      long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
+      throws IOException {
+    this.bucket = bucket;
+    this.key = key;
+    this.client = client;
+    this.fs = fs;
+    this.cannedACL = cannedACL;
+    this.statistics = statistics;
+    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+    //Ensure limit as ByteArrayOutputStream size cannot exceed 
Integer.MAX_VALUE
+    if (partSize > Integer.MAX_VALUE) {
+      this.partSize = Integer.MAX_VALUE;
+      LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " +
+          "when using 'FAST_UPLOAD = true')");
+    } else {
+      this.partSize = (int) partSize;
+    }
+    if (multiPartThreshold > Integer.MAX_VALUE) {
+      this.multiPartThreshold = Integer.MAX_VALUE;
+      LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " +
+          "allowed size when using 'FAST_UPLOAD = true')");
+    } else {
+      this.multiPartThreshold = (int) multiPartThreshold;
+    }
+    this.bufferLimit = this.multiPartThreshold;
+    this.closed = false;
+    int initialBufferSize = this.fs.getConf()
+        .getInt(Constants.FAST_BUFFER_SIZE, 
Constants.DEFAULT_FAST_BUFFER_SIZE);
+    if (initialBufferSize < 0) {
+      LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " +
+          "default value");
+      initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE;
+    } else if (initialBufferSize > this.bufferLimit) {
+      LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " +
+          "exceed MIN_MULTIPART_THRESHOLD");
+      initialBufferSize = this.bufferLimit;
+    }
+    this.buffer = new ByteArrayOutputStream(initialBufferSize);
+    this.executorService = 
MoreExecutors.listeningDecorator(threadPoolExecutor);
+    this.multiPartUpload = null;
+    this.progressListener = new ProgressableListener(progress);
+    if (LOG.isDebugEnabled()){
+      LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
+          bucket, key);
+    }
+  }
+
+  /**
+   * Writes a byte to the memory buffer. If this causes the buffer to reach
+   * its limit, the actual upload is submitted to the threadpool.
+   * @param b the int of which the lowest byte is written
+   * @throws IOException
+   */
+  @Override
+  public synchronized void write(int b) throws IOException {
+    buffer.write(b);
+    if (buffer.size() == bufferLimit) {
+      uploadBuffer();
+    }
+  }
+
+  /**
+   * Writes a range of bytes from to the memory buffer. If this causes the
+   * buffer to reach its limit, the actual upload is submitted to the
+   * threadpool and the remainder of the array is written to memory
+   * (recursively).
+   * @param b byte array containing
+   * @param off offset in array where to start
+   * @param len number of bytes to be written
+   * @throws IOException
+   */
+  @Override
+  public synchronized void write(byte b[], int off, int len)
+      throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    } else if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+    if (buffer.size() + len < bufferLimit) {
+      buffer.write(b, off, len);
+    } else {
+      int firstPart = bufferLimit - buffer.size();
+      buffer.write(b, off, firstPart);
+      uploadBuffer();
+      this.write(b, off + firstPart, len - firstPart);
+    }
+  }
+
+  private synchronized void uploadBuffer() throws IOException {
+    if (multiPartUpload == null) {
+      multiPartUpload = initiateMultiPartUpload();
+       /* Upload the existing buffer if it exceeds partSize. This possibly
+       requires multiple parts! */
+      final byte[] allBytes = buffer.toByteArray();
+      buffer = null; //earlier gc?
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Total length of initial buffer: {}", allBytes.length);
+      }
+      int processedPos = 0;
+      while ((multiPartThreshold - processedPos) >= partSize) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initial buffer: processing from byte {} to byte {}",
+              processedPos, (processedPos + partSize - 1));
+        }
+        multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
+            processedPos, partSize), partSize);
+        processedPos += partSize;
+      }
+      //resize and reset stream
+      bufferLimit = partSize;
+      buffer = new ByteArrayOutputStream(bufferLimit);
+      buffer.write(allBytes, processedPos, multiPartThreshold - processedPos);
+    } else {
+      //upload next part
+      multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
+          .toByteArray()), partSize);
+      buffer.reset();
+    }
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      if (multiPartUpload == null) {
+        putObject();
+      } else {
+        if (buffer.size() > 0) {
+          //send last part
+          multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
+              .toByteArray()), buffer.size());
+        }
+        final List<PartETag> partETags = multiPartUpload
+            .waitForAllPartUploads();
+        multiPartUpload.complete(partETags);
+      }
+      statistics.incrementWriteOps(1);
+      // This will delete unnecessary fake parent directories
+      fs.finishedWrite(key);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
+      }
+    } finally {
+      buffer = null;
+      super.close();
+    }
+  }
+
+  private ObjectMetadata createDefaultMetadata() {
+    ObjectMetadata om = new ObjectMetadata();
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    return om;
+  }
+
+  private MultiPartUpload initiateMultiPartUpload() throws IOException {
+    final ObjectMetadata om = createDefaultMetadata();
+    final InitiateMultipartUploadRequest initiateMPURequest =
+        new InitiateMultipartUploadRequest(bucket, key, om);
+    initiateMPURequest.setCannedACL(cannedACL);
+    try {
+      return new MultiPartUpload(
+          client.initiateMultipartUpload(initiateMPURequest).getUploadId());
+    } catch (AmazonServiceException ase) {
+      throw new IOException("Unable to initiate MultiPartUpload (server side)" 
+
+          ": " + ase, ase);
+    } catch (AmazonClientException ace) {
+      throw new IOException("Unable to initiate MultiPartUpload (client side)" 
+
+          ": " + ace, ace);
+    }
+  }
+
+  private void putObject() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket,
+          key);
+    }
+    final ObjectMetadata om = createDefaultMetadata();
+    om.setContentLength(buffer.size());
+    final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+        new ByteArrayInputStream(buffer.toByteArray()), om);
+    putObjectRequest.setCannedAcl(cannedACL);
+    putObjectRequest.setGeneralProgressListener(progressListener);
+    ListenableFuture<PutObjectResult> putObjectResult =
+        executorService.submit(new Callable<PutObjectResult>() {
+          @Override
+          public PutObjectResult call() throws Exception {
+            return client.putObject(putObjectRequest);
+          }
+        });
+    //wait for completion
+    try {
+      putObjectResult.get();
+    } catch (InterruptedException ie) {
+      LOG.warn("Interrupted object upload:" + ie, ie);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException ee) {
+      throw new IOException("Regular upload failed", ee.getCause());
+    }
+  }
+
+  private class MultiPartUpload {
+    private final String uploadId;
+    private final List<ListenableFuture<PartETag>> partETagsFutures;
+
+    public MultiPartUpload(String uploadId) {
+      this.uploadId = uploadId;
+      this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " 
+
+            "id '{}'", bucket, key, uploadId);
+      }
+    }
+
+    public void uploadPartAsync(ByteArrayInputStream inputStream,
+        int partSize) {
+      final int currentPartNumber = partETagsFutures.size() + 1;
+      final UploadPartRequest request =
+          new UploadPartRequest().withBucketName(bucket).withKey(key)
+              .withUploadId(uploadId).withInputStream(inputStream)
+              .withPartNumber(currentPartNumber).withPartSize(partSize);
+      request.setGeneralProgressListener(progressListener);
+      ListenableFuture<PartETag> partETagFuture =
+          executorService.submit(new Callable<PartETag>() {
+            @Override
+            public PartETag call() throws Exception {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
+                    uploadId);
+              }
+              return client.uploadPart(request).getPartETag();
+            }
+          });
+      partETagsFutures.add(partETagFuture);
+    }
+
+    public List<PartETag> waitForAllPartUploads() throws IOException {
+      try {
+        return Futures.allAsList(partETagsFutures).get();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted partUpload:" + ie, ie);
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException ee) {
+        //there is no way of recovering so abort
+        //cancel all partUploads
+        for (ListenableFuture<PartETag> future : partETagsFutures) {
+          future.cancel(true);
+        }
+        //abort multipartupload
+        this.abort();
+        throw new IOException("Part upload failed in multi-part upload with " +
+            "id '" +uploadId + "':" + ee, ee);
+      }
+      //should not happen?
+      return null;
+    }
+
+    public void complete(List<PartETag> partETags) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
+            uploadId);
+      }
+      final CompleteMultipartUploadRequest completeRequest =
+          new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
+      client.completeMultipartUpload(completeRequest);
+
+    }
+
+    public void abort() {
+      LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
+      try {
+        client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
+            key, uploadId));
+      } catch (Exception e2) {
+        LOG.warn("Unable to abort multipart upload, you may need to purge  " +
+            "uploaded parts: " + e2, e2);
+      }
+    }
+  }
+
+  private static class ProgressableListener implements ProgressListener {
+    private final Progressable progress;
+
+    public ProgressableListener(Progressable progress) {
+      this.progress = progress;
+    }
+
+    public void progressChanged(ProgressEvent progressEvent) {
+      if (progress != null) {
+        progress.progress();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 2373e7e..1a30d6f 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -88,7 +88,8 @@ public class S3AFileSystem extends FileSystem {
   private int maxKeys;
   private long partSize;
   private TransferManager transfers;
-  private int partSizeThreshold;
+  private ThreadPoolExecutor threadPoolExecutor;
+  private int multiPartThreshold;
   public static final Logger LOG = 
LoggerFactory.getLogger(S3AFileSystem.class);
   private CannedAccessControlList cannedACL;
   private String serverSideEncryptionAlgorithm;
@@ -237,7 +238,7 @@ public class S3AFileSystem extends FileSystem {
 
     maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
     partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, 
+    multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
       DEFAULT_MIN_MULTIPART_THRESHOLD);
 
     if (partSize < 5 * 1024 * 1024) {
@@ -245,9 +246,9 @@ public class S3AFileSystem extends FileSystem {
       partSize = 5 * 1024 * 1024;
     }
 
-    if (partSizeThreshold < 5 * 1024 * 1024) {
+    if (multiPartThreshold < 5 * 1024 * 1024) {
       LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
-      partSizeThreshold = 5 * 1024 * 1024;
+      multiPartThreshold = 5 * 1024 * 1024;
     }
 
     int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
@@ -262,20 +263,20 @@ public class S3AFileSystem extends FileSystem {
     LinkedBlockingQueue<Runnable> workQueue =
       new LinkedBlockingQueue<>(maxThreads *
         conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+    threadPoolExecutor = new ThreadPoolExecutor(
         coreThreads,
         maxThreads,
         keepAliveTime,
         TimeUnit.SECONDS,
         workQueue,
         newDaemonThreadFactory("s3a-transfer-shared-"));
-    tpe.allowCoreThreadTimeOut(true);
+    threadPoolExecutor.allowCoreThreadTimeOut(true);
 
     TransferManagerConfiguration transferConfiguration = new 
TransferManagerConfiguration();
     transferConfiguration.setMinimumUploadPartSize(partSize);
-    transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
+    transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
 
-    transfers = new TransferManager(s3, tpe);
+    transfers = new TransferManager(s3, threadPoolExecutor);
     transfers.setConfiguration(transferConfiguration);
 
     String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
@@ -391,7 +392,12 @@ public class S3AFileSystem extends FileSystem {
     if (!overwrite && exists(f)) {
       throw new FileAlreadyExistsException(f + " already exists");
     }
-
+    if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
+      return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
+          key, progress, statistics, cannedACL,
+          serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold,
+          threadPoolExecutor), statistics);
+    }
     // We pass null to FSDataOutputStream so it won't count writes that are 
being buffered to a file
     return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, 
this,
       bucket, key, progress, cannedACL, statistics, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 8e80b92..bf62634 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -213,13 +213,13 @@ If you do any of these: change your credentials 
immediately!
     <property>
       <name>fs.s3a.connection.establish.timeout</name>
       <value>5000</value>
-      <description>Socket connection setup timeout in seconds.</description>
+      <description>Socket connection setup timeout in 
milliseconds.</description>
     </property>
 
     <property>
       <name>fs.s3a.connection.timeout</name>
       <value>50000</value>
-      <description>Socket connection timeout in seconds.</description>
+      <description>Socket connection timeout in milliseconds.</description>
     </property>
 
     <property>
@@ -292,7 +292,7 @@ If you do any of these: change your credentials immediately!
       <name>fs.s3a.buffer.dir</name>
       <value>${hadoop.tmp.dir}/s3a</value>
       <description>Comma separated list of directories that will be used to 
buffer file
-        uploads to.</description>
+        uploads to. No effect if fs.s3a.fast.upload is true.</description>
     </property>
 
     <property>
@@ -301,6 +301,40 @@ If you do any of these: change your credentials 
immediately!
       <description>The implementation class of the S3A Filesystem</description>
     </property>
 
+### S3AFastOutputStream
+ **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
+
+    <property>
+      <name>fs.s3a.fast.upload</name>
+      <value>false</value>
+      <description>Upload directly from memory instead of buffering to
+      disk first. Memory usage and parallelism can be controlled as up to
+      fs.s3a.multipart.size memory is consumed for each (part)upload actively
+      uploading (fs.s3a.threads.max) or queueing 
(fs.s3a.max.total.tasks)</description>
+    </property>
+
+    <property>
+      <name>fs.s3a.fast.buffer.size</name>
+      <value>1048576</value>
+      <description>Size (in bytes) of initial memory buffer allocated for an
+      upload. No effect if fs.s3a.fast.upload is false.</description>
+    </property>
+
+Writes are buffered in memory instead of to a file on local disk. This
+removes the throughput bottleneck of the local disk write and read cycle
+before starting the actual upload. Furthermore, it allows handling files that
+are larger than the remaining local disk space.
+
+However, non-trivial memory tuning is needed for optimal results and careless
+settings could cause memory overflow. Up to `fs.s3a.threads.max` parallel
+(part)uploads are active. Furthermore, up to `fs.s3a.max.total.tasks`
+additional part(uploads) can be waiting (and thus memory buffers are created).
+The memory buffer is uploaded as a single upload if it is not larger than
+`fs.s3a.multipart.threshold`. Else, a multi-part upload is initiatated and
+parts of size `fs.s3a.multipart.size` are used to protect against overflowing
+the available memory. These settings should be tuned to the envisioned
+workflow (some large files, many small ones, ...) and the physical
+limitations of the machine and cluster (memory, network bandwidth).
 
 ## Testing the S3 filesystem clients
 
@@ -334,7 +368,7 @@ each filesystem for its testing.
 The contents of each bucket will be destroyed during the test process:
 do not use the bucket for any purpose other than testing. Furthermore, for
 s3a, all in-progress multi-part uploads to the bucket will be aborted at the
-start of a test (by forcing fs.s3a.multipart.purge=true) to clean up the
+start of a test (by forcing `fs.s3a.multipart.purge=true`) to clean up the
 temporary state of previously failed tests.
 
 Example:
@@ -392,14 +426,14 @@ Example:
 ## File `contract-test-options.xml`
 
 The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml`
-must be created and configured for the test fileystems.
+must be created and configured for the test filesystems.
 
 If a specific file `fs.contract.test.fs.*` test path is not defined for
 any of the filesystems, those tests will be skipped.
 
 The standard S3 authentication details must also be provided. This can be
 through copy-and-paste of the `auth-keys.xml` credentials, or it can be
-through direct XInclude inclustion.
+through direct XInclude inclusion.
 
 #### s3://
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java
new file mode 100644
index 0000000..e507cf6
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Tests regular and multi-part upload functionality for S3AFastOutputStream.
+ * File sizes are kept small to reduce test duration on slow connections
+ */
+public class TestS3AFastOutputStream {
+  private FileSystem fs;
+
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
+    conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    fs = S3ATestUtils.createTestFileSystem(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(getTestPath(), true);
+    }
+  }
+
+  protected Path getTestPath() {
+    return new Path("/tests3a");
+  }
+
+  @Test
+  public void testRegularUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
+  }
+
+  @Test
+  public void testMultiPartUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 *
+        1024);
+  }
+}

Reply via email to