http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 42178a4..27557f8 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -21,20 +21,50 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
-
 import org.slf4j.Logger;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.net.SocketException;
 
-public class S3AInputStream extends FSInputStream {
+/**
+ * The input stream for an S3A object.
+ *
+ * As this stream seeks withing an object, it may close then re-open the 
stream.
+ * When this happens, any updated stream data may be retrieved, and, given
+ * the consistency model of Amazon S3, outdated data may in fact be picked up.
+ *
+ * As a result, the outcome of reading from a stream of an object which is
+ * actively manipulated during the read process is "undefined".
+ *
+ * The class is marked as private as code should not be creating instances
+ * themselves. Any extra feature (e.g instrumentation) should be considered
+ * unstable.
+ *
+ * Because it prints some of the state of the instrumentation,
+ * the output of {@link #toString()} must also be considered unstable.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class S3AInputStream extends FSInputStream implements CanSetReadahead {
+  /**
+   * This is the public position; the one set in {@link #seek(long)}
+   * and returned in {@link #getPos()}.
+   */
   private long pos;
-  private boolean closed;
+  /**
+   * Closed bit. Volatile so reads are non-blocking.
+   * Updates must be in a synchronized block to guarantee an atomic check and
+   * set
+   */
+  private volatile boolean closed;
   private S3ObjectInputStream wrappedStream;
   private final FileSystem.Statistics stats;
   private final AmazonS3Client client;
@@ -44,62 +74,65 @@ public class S3AInputStream extends FSInputStream {
   private final String uri;
   public static final Logger LOG = S3AFileSystem.LOG;
   public static final long CLOSE_THRESHOLD = 4096;
+  private final S3AInstrumentation.InputStreamStatistics streamStatistics;
+  private long readahead;
 
-  // Used by lazy seek
+  /**
+   * This is the actual position within the object, used by
+   * lazy seek to decide whether to seek on the next read or not.
+   */
   private long nextReadPos;
 
-  //Amount of data requested from the request
+  /* Amount of data desired from the request */
   private long requestedStreamLen;
 
-  public S3AInputStream(String bucket, String key, long contentLength,
-      AmazonS3Client client, FileSystem.Statistics stats) {
+  public S3AInputStream(String bucket,
+      String key,
+      long contentLength,
+      AmazonS3Client client,
+      FileSystem.Statistics stats,
+      S3AInstrumentation instrumentation,
+      long readahead) {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket");
+    Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key");
+    Preconditions.checkArgument(contentLength >= 0 , "Negative content 
length");
     this.bucket = bucket;
     this.key = key;
     this.contentLength = contentLength;
     this.client = client;
     this.stats = stats;
-    this.pos = 0;
-    this.nextReadPos = 0;
-    this.closed = false;
-    this.wrappedStream = null;
     this.uri = "s3a://" + this.bucket + "/" + this.key;
+    this.streamStatistics = instrumentation.newInputStreamStatistics();
+    setReadahead(readahead);
   }
 
   /**
    * Opens up the stream at specified target position and for given length.
    *
+   * @param reason reason for reopen
    * @param targetPos target position
    * @param length length requested
    * @throws IOException
    */
-  private synchronized void reopen(long targetPos, long length)
+  private synchronized void reopen(String reason, long targetPos, long length)
       throws IOException {
-    requestedStreamLen = (length < 0) ? this.contentLength :
-        Math.max(this.contentLength, (CLOSE_THRESHOLD + (targetPos + length)));
+    requestedStreamLen = this.contentLength;
 
     if (wrappedStream != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Closing the previous stream");
-      }
-      closeStream(requestedStreamLen);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Requesting for "
-          + "targetPos=" + targetPos
-          + ", length=" + length
-          + ", requestedStreamLen=" + requestedStreamLen
-          + ", streamPosition=" + pos
-          + ", nextReadPosition=" + nextReadPos
-      );
+      closeStream("reopen(" + reason + ")", requestedStreamLen);
     }
+    LOG.debug("reopen({}) for {} at targetPos={}, length={}," +
+        " requestedStreamLen={}, streamPosition={}, nextReadPosition={}",
+        uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos);
 
+    streamStatistics.streamOpened();
     GetObjectRequest request = new GetObjectRequest(bucket, key)
         .withRange(targetPos, requestedStreamLen);
     wrappedStream = client.getObject(request).getObjectContent();
 
     if (wrappedStream == null) {
-      throw new IOException("Null IO stream");
+      throw new IOException("Null IO stream from reopen of (" + reason +  ") "
+          + uri);
     }
 
     this.pos = targetPos;
@@ -129,6 +162,20 @@ public class S3AInputStream extends FSInputStream {
   }
 
   /**
+   * Seek without raising any exception. This is for use in
+   * {@code finally} clauses
+   * @param positiveTargetPos a target position which must be positive.
+   */
+  private void seekQuietly(long positiveTargetPos) {
+    try {
+      seek(positiveTargetPos);
+    } catch (IOException ioe) {
+      LOG.debug("Ignoring IOE on seek of {} to {}",
+          uri, positiveTargetPos, ioe);
+    }
+  }
+
+  /**
    * Adjust the stream to a specific position.
    *
    * @param targetPos target seek position
@@ -140,23 +187,50 @@ public class S3AInputStream extends FSInputStream {
     if (wrappedStream == null) {
       return;
     }
-
     // compute how much more to skip
     long diff = targetPos - pos;
-    if (targetPos > pos) {
-      if ((diff + length) <= wrappedStream.available()) {
-        // already available in buffer
-        pos += wrappedStream.skip(diff);
-        if (pos != targetPos) {
-          throw new IOException("Failed to seek to " + targetPos
-              + ". Current position " + pos);
+    if (diff > 0) {
+      // forward seek -this is where data can be skipped
+
+      int available = wrappedStream.available();
+      // always seek at least as far as what is available
+      long forwardSeekRange = Math.max(readahead, available);
+      // work out how much is actually left in the stream
+      // then choose whichever comes first: the range or the EOF
+      long forwardSeekLimit = Math.min(remaining(), forwardSeekRange);
+      if (diff <= forwardSeekLimit) {
+        // the forward seek range is within the limits
+        LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
+        streamStatistics.seekForwards(diff);
+        long skipped = wrappedStream.skip(diff);
+        if (skipped > 0) {
+          pos += skipped;
+          // as these bytes have been read, they are included in the counter
+          incrementBytesRead(diff);
+        }
+
+        if (pos == targetPos) {
+          // all is well
+          return;
+        } else {
+          // log a warning; continue to attempt to re-open
+          LOG.warn("Failed to seek on {} to {}. Current position {}",
+              uri, targetPos,  pos);
         }
-        return;
       }
+    } else if (diff < 0) {
+      // backwards seek
+      streamStatistics.seekBackwards(diff);
+    } else {
+      // targetPos == pos
+      // this should never happen as the caller filters it out.
+      // Retained just in case
+      LOG.debug("Ignoring seek {} to {} as target position == current",
+          uri, targetPos);
     }
 
     // close the stream; if read the object will be opened at the new pos
-    closeStream(this.requestedStreamLen);
+    closeStream("seekInStream()", this.requestedStreamLen);
     pos = targetPos;
   }
 
@@ -179,7 +253,19 @@ public class S3AInputStream extends FSInputStream {
 
     //re-open at specific location if needed
     if (wrappedStream == null) {
-      reopen(targetPos, len);
+      reopen("read from new offset", targetPos, len);
+    }
+  }
+
+  /**
+   * Increment the bytes read counter if there is a stats instance
+   * and the number of bytes read is more than zero.
+   * @param bytesRead number of bytes read
+   */
+  private void incrementBytesRead(long bytesRead) {
+    streamStatistics.bytesRead(bytesRead);
+    if (stats != null && bytesRead > 0) {
+      stats.incrementBytesRead(bytesRead);
     }
   }
 
@@ -195,13 +281,11 @@ public class S3AInputStream extends FSInputStream {
     int byteRead;
     try {
       byteRead = wrappedStream.read();
-    } catch (SocketTimeoutException | SocketException e) {
-      LOG.info("Got exception while trying to read from stream,"
-          + " trying to recover " + e);
-      reopen(pos, 1);
-      byteRead = wrappedStream.read();
     } catch (EOFException e) {
       return -1;
+    } catch (IOException e) {
+      onReadFailure(e, 1);
+      byteRead = wrappedStream.read();
     }
 
     if (byteRead >= 0) {
@@ -209,12 +293,36 @@ public class S3AInputStream extends FSInputStream {
       nextReadPos++;
     }
 
-    if (stats != null && byteRead >= 0) {
-      stats.incrementBytesRead(1);
+    if (byteRead >= 0) {
+      incrementBytesRead(1);
     }
     return byteRead;
   }
 
+  /**
+   * Handle an IOE on a read by attempting to re-open the stream.
+   * The filesystem's readException count will be incremented.
+   * @param ioe exception caught.
+   * @param length length of data being attempted to read
+   * @throws IOException any exception thrown on the re-open attempt.
+   */
+  private void onReadFailure(IOException ioe, int length) throws IOException {
+    LOG.info("Got exception while trying to read from stream {}"
+        + " trying to recover: "+ ioe, uri);
+    LOG.debug("While trying to read from stream {}", uri, ioe);
+    streamStatistics.readException();
+    reopen("failure recovery", pos, length);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * This updates the statistics on read operations started and whether
+   * or not the read operation "completed", that is: returned the exact
+   * number of bytes requested.
+   * @throws EOFException if there is no more data
+   * @throws IOException if there are other problems
+   */
   @Override
   public synchronized int read(byte[] buf, int off, int len)
       throws IOException {
@@ -230,61 +338,85 @@ public class S3AInputStream extends FSInputStream {
     }
 
     lazySeek(nextReadPos, len);
+    streamStatistics.readOperationStarted(nextReadPos, len);
 
-    int byteRead;
+    int bytesRead;
     try {
-      byteRead = wrappedStream.read(buf, off, len);
-    } catch (SocketTimeoutException | SocketException e) {
-      LOG.info("Got exception while trying to read from stream,"
-          + " trying to recover " + e);
-      reopen(pos, len);
-      byteRead = wrappedStream.read(buf, off, len);
-    }
-
-    if (byteRead > 0) {
-      pos += byteRead;
-      nextReadPos += byteRead;
+      bytesRead = wrappedStream.read(buf, off, len);
+    } catch (EOFException e) {
+      throw e;
+    } catch (IOException e) {
+      onReadFailure(e, len);
+      bytesRead = wrappedStream.read(buf, off, len);
     }
 
-    if (stats != null && byteRead > 0) {
-      stats.incrementBytesRead(byteRead);
+    if (bytesRead > 0) {
+      pos += bytesRead;
+      nextReadPos += bytesRead;
     }
-
-    return byteRead;
+    incrementBytesRead(bytesRead);
+    streamStatistics.readOperationCompleted(len, bytesRead);
+    return bytesRead;
   }
 
+  /**
+   * Verify that the input stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
   private void checkNotClosed() throws IOException {
     if (closed) {
-      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+      throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
     }
   }
 
+  /**
+   * Close the stream.
+   * This triggers publishing of the stream statistics back to the filesystem
+   * statistics.
+   * This operation is synchronized, so that only one thread can attempt to
+   * close the connection; all later/blocked calls are no-ops.
+   * @throws IOException on any problem
+   */
   @Override
   public synchronized void close() throws IOException {
-    super.close();
-    closed = true;
-    closeStream(this.contentLength);
+    if (!closed) {
+      closed = true;
+      try {
+        // close or abort the stream
+        closeStream("close() operation", this.contentLength);
+        // this is actually a no-op
+        super.close();
+      } finally {
+        // merge the statistics back into the FS statistics.
+        streamStatistics.close();
+      }
+    }
   }
 
   /**
    * Close a stream: decide whether to abort or close, based on
    * the length of the stream and the current position.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
    *
    * This does not set the {@link #closed} flag.
+   *
+   * @param reason reason for stream being closed; used in messages
    * @param length length of the stream.
-   * @throws IOException
    */
-  private void closeStream(long length) throws IOException {
+  private void closeStream(String reason, long length) {
     if (wrappedStream != null) {
-      String reason = null;
       boolean shouldAbort = length - pos > CLOSE_THRESHOLD;
       if (!shouldAbort) {
         try {
-          reason = "Closed stream";
+          // clean close. This will read to the end of the stream,
+          // so, while cleaner, can be pathological on a multi-GB object
           wrappedStream.close();
+          streamStatistics.streamClose(false);
         } catch (IOException e) {
           // exception escalates to an abort
-          LOG.debug("When closing stream", e);
+          LOG.debug("When closing {} stream for {}", uri, reason, e);
           shouldAbort = true;
         }
       }
@@ -292,13 +424,12 @@ public class S3AInputStream extends FSInputStream {
         // Abort, rather than just close, the underlying stream.  Otherwise, 
the
         // remaining object payload is read from S3 while closing the stream.
         wrappedStream.abort();
-        reason = "Closed stream with abort";
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(reason + "; streamPos=" + pos
-            + ", nextReadPos=" + nextReadPos
-            + ", contentLength=" + length);
+        streamStatistics.streamClose(true);
       }
+      LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," +
+          " length={}",
+          uri, (shouldAbort ? "aborted":"closed"), reason, pos, nextReadPos,
+          length);
       wrappedStream = null;
     }
   }
@@ -307,19 +438,34 @@ public class S3AInputStream extends FSInputStream {
   public synchronized int available() throws IOException {
     checkNotClosed();
 
-    long remaining = this.contentLength - this.pos;
+    long remaining = remaining();
     if (remaining > Integer.MAX_VALUE) {
       return Integer.MAX_VALUE;
     }
     return (int)remaining;
   }
 
+  /**
+   * Bytes left in stream.
+   * @return how many bytes are left to read
+   */
+  protected long remaining() {
+    return this.contentLength - this.pos;
+  }
+
   @Override
   public boolean markSupported() {
     return false;
   }
 
+  /**
+   * String value includes statistics as well as stream state.
+   * <b>Important: there are no guarantees as to the stability
+   * of this value.</b>
+   * @return a string value for printing in logs/diagnostics
+   */
   @Override
+  @InterfaceStability.Unstable
   public String toString() {
     final StringBuilder sb = new StringBuilder(
         "S3AInputStream{");
@@ -327,6 +473,7 @@ public class S3AInputStream extends FSInputStream {
     sb.append(" pos=").append(pos);
     sb.append(" nextReadPos=").append(nextReadPos);
     sb.append(" contentLength=").append(contentLength);
+    sb.append(" ").append(streamStatistics.toString());
     sb.append('}');
     return sb.toString();
   }
@@ -348,6 +495,7 @@ public class S3AInputStream extends FSInputStream {
       throws IOException {
     checkNotClosed();
     validatePositionedReadArgs(position, buffer, offset, length);
+    streamStatistics.readFullyOperationStarted(position, length);
     if (length == 0) {
       return;
     }
@@ -363,10 +511,38 @@ public class S3AInputStream extends FSInputStream {
           }
           nread += nbytes;
         }
-
       } finally {
-        seek(oldPos);
+        seekQuietly(oldPos);
       }
     }
   }
+
+  /**
+   * Access the input stream statistics.
+   * This is for internal testing and may be removed without warning.
+   * @return the statistics for this input stream
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public S3AInstrumentation.InputStreamStatistics getS3AStreamStatistics() {
+    return streamStatistics;
+  }
+
+  @Override
+  public void setReadahead(Long readahead) {
+    if (readahead == null) {
+      this.readahead = Constants.DEFAULT_READAHEAD_RANGE;
+    } else {
+      Preconditions.checkArgument(readahead >= 0, "Negative readahead value");
+      this.readahead = readahead;
+    }
+  }
+
+  /**
+   * Get the current readahead value.
+   * @return a non-negative readahead value
+   */
+  public long getReadahead() {
+    return readahead;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
new file mode 100644
index 0000000..285f228
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricStringBuilder;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Instrumentation of S3a.
+ * Derived from the {@code AzureFileSystemInstrumentation}
+ */
+@Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class S3AInstrumentation {
+  public static final String CONTEXT = "S3AFileSystem";
+
+  public static final String STREAM_OPENED = "streamOpened";
+  public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations";
+  public static final String STREAM_CLOSED = "streamClosed";
+  public static final String STREAM_ABORTED = "streamAborted";
+  public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions";
+  public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations";
+  public static final String STREAM_FORWARD_SEEK_OPERATIONS
+      = "streamForwardSeekOperations";
+  public static final String STREAM_BACKWARD_SEEK_OPERATIONS
+      = "streamBackwardSeekOperations";
+  public static final String STREAM_SEEK_BYTES_SKIPPED =
+      "streamBytesSkippedOnSeek";
+  public static final String STREAM_SEEK_BYTES_BACKWARDS =
+      "streamBytesBackwardsOnSeek";
+  public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead";
+  public static final String STREAM_READ_OPERATIONS = "streamReadOperations";
+  public static final String STREAM_READ_FULLY_OPERATIONS
+      = "streamReadFullyOperations";
+  public static final String STREAM_READ_OPERATIONS_INCOMPLETE
+      = "streamReadOperationsIncomplete";
+  public static final String FILES_CREATED = "files_created";
+  public static final String FILES_COPIED = "files_copied";
+  public static final String FILES_COPIED_BYTES = "files_copied_bytes";
+  public static final String FILES_DELETED = "files_deleted";
+  public static final String DIRECTORIES_CREATED = "directories_created";
+  public static final String DIRECTORIES_DELETED = "directories_deleted";
+  public static final String IGNORED_ERRORS = "ignored_errors";
+  private final MetricsRegistry registry =
+      new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
+  private final MutableCounterLong streamOpenOperations;
+  private final MutableCounterLong streamCloseOperations;
+  private final MutableCounterLong streamClosed;
+  private final MutableCounterLong streamAborted;
+  private final MutableCounterLong streamSeekOperations;
+  private final MutableCounterLong streamReadExceptions;
+  private final MutableCounterLong streamForwardSeekOperations;
+  private final MutableCounterLong streamBackwardSeekOperations;
+  private final MutableCounterLong streamBytesSkippedOnSeek;
+  private final MutableCounterLong streamBytesBackwardsOnSeek;
+  private final MutableCounterLong streamBytesRead;
+  private final MutableCounterLong streamReadOperations;
+  private final MutableCounterLong streamReadFullyOperations;
+  private final MutableCounterLong streamReadsIncomplete;
+  private final MutableCounterLong ignoredErrors;
+
+  private final MutableCounterLong numberOfFilesCreated;
+  private final MutableCounterLong numberOfFilesCopied;
+  private final MutableCounterLong bytesOfFilesCopied;
+  private final MutableCounterLong numberOfFilesDeleted;
+  private final MutableCounterLong numberOfDirectoriesCreated;
+  private final MutableCounterLong numberOfDirectoriesDeleted;
+  private final Map<String, MutableCounterLong> streamMetrics = new 
HashMap<>();
+
+  public S3AInstrumentation(URI name) {
+    UUID fileSystemInstanceId = UUID.randomUUID();
+    registry.tag("FileSystemId",
+        "A unique identifier for the FS ",
+        fileSystemInstanceId.toString() + "-" + name.getHost());
+    registry.tag("fsURI",
+        "URI of this filesystem",
+        name.toString());
+    streamOpenOperations = streamCounter(STREAM_OPENED,
+        "Total count of times an input stream to object store was opened");
+    streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS,
+        "Total count of times an attempt to close a data stream was made");
+    streamClosed = streamCounter(STREAM_CLOSED,
+        "Count of times the TCP stream was closed");
+    streamAborted = streamCounter(STREAM_ABORTED,
+        "Count of times the TCP stream was aborted");
+    streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS,
+        "Number of seek operations invoked on input streams");
+    streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS,
+        "Number of read exceptions caught and attempted to recovered from");
+    streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS,
+        "Number of executed seek operations which went forward in a stream");
+    streamBackwardSeekOperations = streamCounter(
+        STREAM_BACKWARD_SEEK_OPERATIONS,
+        "Number of executed seek operations which went backwards in a stream");
+    streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED,
+        "Count of bytes skipped during forward seek operations");
+    streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS,
+        "Count of bytes moved backwards during seek operations");
+    streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ,
+        "Count of bytes read during seek() in stream operations");
+    streamReadOperations = streamCounter(STREAM_READ_OPERATIONS,
+        "Count of read() operations in streams");
+    streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS,
+        "Count of readFully() operations in streams");
+    streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE,
+        "Count of incomplete read() operations in streams");
+
+    numberOfFilesCreated = counter(FILES_CREATED,
+            "Total number of files created through the object store.");
+    numberOfFilesCopied = counter(FILES_COPIED,
+            "Total number of files copied within the object store.");
+    bytesOfFilesCopied = counter(FILES_COPIED_BYTES,
+            "Total number of bytes copied within the object store.");
+    numberOfFilesDeleted = counter(FILES_DELETED,
+            "Total number of files deleted through from the object store.");
+    numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED,
+        "Total number of directories created through the object store.");
+    numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED,
+        "Total number of directories deleted through the object store.");
+    ignoredErrors = counter(IGNORED_ERRORS,
+        "Total number of errors caught and ingored.");
+  }
+
+  /**
+   * Create a counter in the registry.
+   * @param name counter name
+   * @param desc counter description
+   * @return a new counter
+   */
+  protected final MutableCounterLong counter(String name, String desc) {
+    return registry.newCounter(name, desc, 0L);
+  }
+
+  /**
+   * Create a counter in the stream map: these are unregistered in the public
+   * metrics.
+   * @param name counter name
+   * @param desc counter description
+   * @return a new counter
+   */
+  protected final MutableCounterLong streamCounter(String name, String desc) {
+    MutableCounterLong counter = new MutableCounterLong(
+        Interns.info(name, desc), 0L);
+    streamMetrics.put(name, counter);
+    return counter;
+  }
+
+  /**
+   * Create a gauge in the registry.
+   * @param name name gauge name
+   * @param desc description
+   * @return the gauge
+   */
+  protected final MutableGaugeLong gauge(String name, String desc) {
+    return registry.newGauge(name, desc, 0L);
+  }
+
+  /**
+   * Get the metrics registry.
+   * @return the registry
+   */
+  public MetricsRegistry getRegistry() {
+    return registry;
+  }
+
+  /**
+   * Dump all the metrics to a string.
+   * @param prefix prefix before every entry
+   * @param separator separator between name and value
+   * @param suffix suffix
+   * @param all get all the metrics even if the values are not changed.
+   * @return a string dump of the metrics
+   */
+  public String dump(String prefix,
+      String separator,
+      String suffix,
+      boolean all) {
+    MetricStringBuilder metricBuilder = new MetricStringBuilder(null,
+        prefix,
+        separator, suffix);
+    registry.snapshot(metricBuilder, all);
+    for (Map.Entry<String, MutableCounterLong> entry:
+        streamMetrics.entrySet()) {
+      metricBuilder.tuple(entry.getKey(),
+          Long.toString(entry.getValue().value()));
+    }
+    return metricBuilder.toString();
+  }
+
+  /**
+   * Indicate that S3A created a file.
+   */
+  public void fileCreated() {
+    numberOfFilesCreated.incr();
+  }
+
+  /**
+   * Indicate that S3A deleted one or more file.s
+   * @param count number of files.
+   */
+  public void fileDeleted(int count) {
+    numberOfFilesDeleted.incr(count);
+  }
+
+  /**
+   * Indicate that S3A created a directory.
+   */
+  public void directoryCreated() {
+    numberOfDirectoriesCreated.incr();
+  }
+
+  /**
+   * Indicate that S3A just deleted a directory.
+   */
+  public void directoryDeleted() {
+    numberOfDirectoriesDeleted.incr();
+  }
+
+  /**
+   * Indicate that S3A copied some files within the store.
+   *
+   * @param files number of files
+   * @param size total size in bytes
+   */
+  public void filesCopied(int files, long size) {
+    numberOfFilesCopied.incr(files);
+    bytesOfFilesCopied.incr(size);
+  }
+
+  /**
+   * Note that an error was ignored.
+   */
+  public void errorIgnored() {
+    ignoredErrors.incr();
+  }
+
+  /**
+   * Create a stream input statistics instance.
+   * @return the new instance
+   */
+  InputStreamStatistics newInputStreamStatistics() {
+    return new InputStreamStatistics();
+  }
+
+  /**
+   * Merge in the statistics of a single input stream into
+   * the filesystem-wide statistics.
+   * @param statistics stream statistics
+   */
+  private void mergeInputStreamStatistics(InputStreamStatistics statistics) {
+    streamOpenOperations.incr(statistics.openOperations);
+    streamCloseOperations.incr(statistics.closeOperations);
+    streamClosed.incr(statistics.closed);
+    streamAborted.incr(statistics.aborted);
+    streamSeekOperations.incr(statistics.seekOperations);
+    streamReadExceptions.incr(statistics.readExceptions);
+    streamForwardSeekOperations.incr(statistics.forwardSeekOperations);
+    streamBytesSkippedOnSeek.incr(statistics.bytesSkippedOnSeek);
+    streamBackwardSeekOperations.incr(statistics.backwardSeekOperations);
+    streamBytesBackwardsOnSeek.incr(statistics.bytesBackwardsOnSeek);
+    streamBytesRead.incr(statistics.bytesRead);
+    streamReadOperations.incr(statistics.readOperations);
+    streamReadFullyOperations.incr(statistics.readFullyOperations);
+    streamReadsIncomplete.incr(statistics.readsIncomplete);
+  }
+
+  /**
+   * Statistics updated by an input stream during its actual operation.
+   * These counters not thread-safe and are for use in a single instance
+   * of a stream.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public final class InputStreamStatistics implements AutoCloseable {
+    public long openOperations;
+    public long closeOperations;
+    public long closed;
+    public long aborted;
+    public long seekOperations;
+    public long readExceptions;
+    public long forwardSeekOperations;
+    public long backwardSeekOperations;
+    public long bytesRead;
+    public long bytesSkippedOnSeek;
+    public long bytesBackwardsOnSeek;
+    public long readOperations;
+    public long readFullyOperations;
+    public long readsIncomplete;
+
+    private InputStreamStatistics() {
+    }
+
+    /**
+     * Seek backwards, incrementing the seek and backward seek counters.
+     * @param negativeOffset how far was the seek?
+     * This is expected to be negative.
+     */
+    public void seekBackwards(long negativeOffset) {
+      seekOperations++;
+      backwardSeekOperations++;
+      bytesBackwardsOnSeek -= negativeOffset;
+    }
+
+    /**
+     * Record a forward seek, adding a seek operation, a forward
+     * seek operation, and any bytes skipped.
+     * @param skipped number of bytes skipped by reading from the stream.
+     * If the seek was implemented by a close + reopen, set this to zero.
+     */
+    public void seekForwards(long skipped) {
+      seekOperations++;
+      forwardSeekOperations++;
+      if (skipped > 0) {
+        bytesSkippedOnSeek += skipped;
+      }
+    }
+
+    /**
+     * The inner stream was opened.
+     */
+    public void streamOpened() {
+      openOperations++;
+    }
+
+    /**
+     * The inner stream was closed.
+     * @param abortedConnection flag to indicate the stream was aborted,
+     * rather than closed cleanly
+     */
+    public void streamClose(boolean abortedConnection) {
+      closeOperations++;
+      if (abortedConnection) {
+        this.aborted++;
+      } else {
+        closed++;
+      }
+    }
+
+    /**
+     * An ignored stream read exception was received.
+     */
+    public void readException() {
+      readExceptions++;
+    }
+
+    /**
+     * Increment the bytes read counter by the number of bytes;
+     * no-op if the argument is negative.
+     * @param bytes number of bytes read
+     */
+    public void bytesRead(long bytes) {
+      if (bytes > 0) {
+        bytesRead += bytes;
+      }
+    }
+
+    /**
+     * A {@code read(byte[] buf, int off, int len)} operation has started.
+     * @param pos starting position of the read
+     * @param len length of bytes to read
+     */
+    public void readOperationStarted(long pos, long len) {
+      readOperations++;
+    }
+
+    /**
+     * A {@code PositionedRead.read(position, buffer, offset, length)}
+     * operation has just started.
+     * @param pos starting position of the read
+     * @param len length of bytes to read
+     */
+    public void readFullyOperationStarted(long pos, long len) {
+      readFullyOperations++;
+    }
+
+    /**
+     * A read operation has completed.
+     * @param requested number of requested bytes
+     * @param actual the actual number of bytes
+     */
+    public void readOperationCompleted(int requested, int actual) {
+      if (requested > actual) {
+        readsIncomplete++;
+      }
+    }
+
+    /**
+     * Close triggers the merge of statistics into the filesystem's
+     * instrumentation instance.
+     */
+    @Override
+    public void close() {
+      mergeInputStreamStatistics(this);
+    }
+
+    /**
+     * String operator describes all the current statistics.
+     * <b>Important: there are no guarantees as to the stability
+     * of this value.</b>
+     * @return the current values of the stream statistics.
+     */
+    @Override
+    @InterfaceStability.Unstable
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "StreamStatistics{");
+      sb.append("OpenOperations=").append(openOperations);
+      sb.append(", CloseOperations=").append(closeOperations);
+      sb.append(", Closed=").append(closed);
+      sb.append(", Aborted=").append(aborted);
+      sb.append(", SeekOperations=").append(seekOperations);
+      sb.append(", ReadExceptions=").append(readExceptions);
+      sb.append(", ForwardSeekOperations=")
+          .append(forwardSeekOperations);
+      sb.append(", BackwardSeekOperations=")
+          .append(backwardSeekOperations);
+      sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
+      sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
+      sb.append(", BytesRead=").append(bytesRead);
+      sb.append(", BytesRead excluding skipped=")
+          .append(bytesRead - bytesSkippedOnSeek);
+      sb.append(", ReadOperations=").append(readOperations);
+      sb.append(", ReadFullyOperations=").append(readFullyOperations);
+      sb.append(", ReadsIncomplete=").append(readsIncomplete);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 3e079f2..f9ff701 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -21,14 +21,14 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressEventType;
 import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
 import com.amazonaws.services.s3.transfer.Upload;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -46,6 +46,11 @@ import static 
com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
 import static 
com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 
+/**
+ * Output stream to save data to S3.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class S3AOutputStream extends OutputStream {
   private OutputStream backupStream;
   private File backupFile;
@@ -65,9 +70,9 @@ public class S3AOutputStream extends OutputStream {
   public static final Logger LOG = S3AFileSystem.LOG;
 
   public S3AOutputStream(Configuration conf, TransferManager transfers,
-    S3AFileSystem fs, String bucket, String key, Progressable progress, 
-    CannedAccessControlList cannedACL, FileSystem.Statistics statistics, 
-    String serverSideEncryptionAlgorithm)
+      S3AFileSystem fs, String bucket, String key, Progressable progress,
+      CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
+      String serverSideEncryptionAlgorithm)
       throws IOException {
     this.bucket = bucket;
     this.key = key;
@@ -78,9 +83,8 @@ public class S3AOutputStream extends OutputStream {
     this.statistics = statistics;
     this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
 
-    partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    partSizeThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
-        DEFAULT_MIN_MULTIPART_THRESHOLD);
+    partSize = fs.getPartitionSize();
+    partSizeThreshold = fs.getMultiPartThreshold();
 
     if (conf.get(BUFFER_DIR, null) != null) {
       lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
@@ -91,10 +95,8 @@ public class S3AOutputStream extends OutputStream {
     backupFile = lDirAlloc.createTmpFileForWrite("output-", 
LocalDirAllocator.SIZE_UNKNOWN, conf);
     closed = false;
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("OutputStream for key '" + key + "' writing to tempfile: " +
-                this.backupFile);
-    }
+    LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
+        key, backupFile);
 
     this.backupStream = new BufferedOutputStream(new 
FileOutputStream(backupFile));
   }
@@ -111,10 +113,9 @@ public class S3AOutputStream extends OutputStream {
     }
 
     backupStream.close();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("OutputStream for key '" + key + "' closed. Now beginning 
upload");
-      LOG.debug("Minimum upload part size: " + partSize + " threshold " + 
partSizeThreshold);
-    }
+    LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);
+    LOG.debug("Minimum upload part size: {} threshold {}" , partSize,
+        partSizeThreshold);
 
 
     try {
@@ -129,7 +130,7 @@ public class S3AOutputStream extends OutputStream {
       Upload upload = transfers.upload(putObjectRequest);
 
       ProgressableProgressListener listener = 
-        new ProgressableProgressListener(upload, progress, statistics);
+          new ProgressableProgressListener(upload, progress, statistics);
       upload.addProgressListener(listener);
 
       upload.waitForUploadResult();
@@ -168,6 +169,9 @@ public class S3AOutputStream extends OutputStream {
     backupStream.write(b, off, len);
   }
 
+  /**
+   * Listener to progress from AWS regarding transfers.
+   */
   public static class ProgressableProgressListener implements ProgressListener 
{
     private Progressable progress;
     private FileSystem.Statistics statistics;
@@ -175,7 +179,7 @@ public class S3AOutputStream extends OutputStream {
     private Upload upload;
 
     public ProgressableProgressListener(Upload upload, Progressable progress, 
-      FileSystem.Statistics statistics) {
+        FileSystem.Statistics statistics) {
       this.upload = upload;
       this.progress = progress;
       this.statistics = statistics;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index e6fbe89..2259200 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -434,6 +434,14 @@ this capability.
       <description>The implementation class of the S3A 
AbstractFileSystem.</description>
     </property>
 
+    <property>
+      <name>fs.s3a.readahead.range</name>
+      <value>65536</value>
+      <description>Bytes to read ahead during a seek() before closing and
+      re-opening the S3 HTTP connection. This option will be overridden if
+      any call to setReadahead() is made to an open stream.</description>
+    </property>
+
 ### S3AFastOutputStream
  **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
 
@@ -644,7 +652,7 @@ Example:
     <configuration>
     
       <include xmlns="http://www.w3.org/2001/XInclude";
-        href="auth-keys.xml"/>
+        href="/home/testuser/.ssh/auth-keys.xml"/>
     
       <property>
         <name>fs.contract.test.fs.s3</name>
@@ -664,7 +672,61 @@ Example:
 
     </configuration>
 
-This example pulls in the `auth-keys.xml` file for the credentials. 
+This example pulls in the `~/.ssh/auth-keys.xml` file for the credentials.
 This provides one single place to keep the keys up to date —and means
 that the file `contract-test-options.xml` does not contain any
-secret credentials itself.
+secret credentials itself. As the auth keys XML file is kept out of the
+source code tree, it is not going to get accidentally committed.
+
+### Running Performance Tests against non-AWS storage infrastructures
+
+
+#### CSV Data source
+
+The `TestS3AInputStreamPerformance` tests require read access to a multi-MB
+text file. The default file for these tests is one published by amazon,
+[s3a://landsat-pds.s3.amazonaws.com/scene_list.gz](http://landsat-pds.s3.amazonaws.com/scene_list.gz).
+This is a gzipped CSV index of other files which amazon serves for open use.
+
+The path to this object is set in the option `fs.s3a.scale.test.csvfile`:
+
+    <property>
+      <name>fs.s3a.scale.test.csvfile</name>
+      <value>s3a://landsat-pds/scene_list.gz</value>
+    </property>
+
+1. If the option is not overridden, the default value is used. This
+is hosted in Amazon's US-east datacenter.
+1. If the property is empty, tests which require it will be skipped.
+1. If the data cannot be read for any reason then the test will fail.
+1. If the property is set to a different path, then that data must be readable
+and "sufficiently" large.
+
+To test on different S3 endpoints, or alternate infrastructures supporting
+the same APIs, the option `fs.s3a.scale.test.csvfile` must therefore be
+set to " ", or an object of at least 10MB is uploaded to the object store, and
+the `fs.s3a.scale.test.csvfile` option set to its path.
+
+      <property>
+        <name>fs.s3a.scale.test.csvfile</name>
+        <value> </value>
+      </property>
+
+
+#### Scale test operation count
+
+Some scale tests perform multiple operations (such as creating many 
directories).
+
+The exact number of operations to perform is configurable in the option
+`scale.test.operation.count`
+
+      <property>
+        <name>scale.test.operation.count</name>
+        <value>10</value>
+      </property>
+
+Larger values generate more load, and are recommended when testing locally,
+or in batch runs.
+
+Smaller values should result in faster test runs, especially when the object
+store is a long way away.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
index e44a90e..42c552a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
@@ -19,18 +19,25 @@
 package org.apache.hadoop.fs.s3a.scale;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URI;
+import java.io.InputStream;
+import java.util.Locale;
 
 import static org.junit.Assume.assumeTrue;
 
@@ -38,30 +45,61 @@ import static org.junit.Assume.assumeTrue;
  * Base class for scale tests; here is where the common scale configuration
  * keys are defined.
  */
-public class S3AScaleTestBase {
+public class S3AScaleTestBase extends Assert {
 
   public static final String SCALE_TEST = "scale.test.";
 
+  public static final String S3A_SCALE_TEST = "fs.s3a.scale.test.";
+
+  @Rule
+  public TestName methodName = new TestName();
+
+  @BeforeClass
+  public static void nameThread() {
+    Thread.currentThread().setName("JUnit");
+  }
+
   /**
-   * The number of operations to perform: {@value}
+   * The number of operations to perform: {@value}.
    */
   public static final String KEY_OPERATION_COUNT =
       SCALE_TEST + "operation.count";
 
   /**
-   * The default number of operations to perform: {@value}
+   * The readahead buffer: {@value}.
+   */
+  public static final String KEY_READ_BUFFER_SIZE =
+      S3A_SCALE_TEST + "read.buffer.size";
+
+  public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
+
+  /**
+   * Key for a multi MB test file: {@value}.
+   */
+  public static final String KEY_CSVTEST_FILE =
+      S3A_SCALE_TEST + "csvfile";
+
+  /**
+   * Default path for the multi MB test file: {@value}.
+   */
+  public static final String DEFAULT_CSVTEST_FILE
+      = "s3a://landsat-pds/scene_list.gz";
+
+  /**
+   * The default number of operations to perform: {@value}.
    */
   public static final long DEFAULT_OPERATION_COUNT = 2005;
 
   protected S3AFileSystem fs;
-  private static final Logger LOG =
+
+  protected static final Logger LOG =
       LoggerFactory.getLogger(S3AScaleTestBase.class);
 
   private Configuration conf;
 
   /**
    * Configuration generator. May be overridden to inject
-   * some custom options
+   * some custom options.
    * @return a configuration with which to create FS instances
    */
   protected Configuration createConfiguration() {
@@ -69,7 +107,7 @@ public class S3AScaleTestBase {
   }
 
   /**
-   * Get the configuration used to set up the FS
+   * Get the configuration used to set up the FS.
    * @return the configuration
    */
   public Configuration getConf() {
@@ -79,7 +117,7 @@ public class S3AScaleTestBase {
   @Before
   public void setUp() throws Exception {
     conf = createConfiguration();
-    LOG.info("Scale test operation count = {}", getOperationCount());
+    LOG.debug("Scale test operation count = {}", getOperationCount());
     fs = S3ATestUtils.createTestFileSystem(conf);
   }
 
@@ -95,4 +133,139 @@ public class S3AScaleTestBase {
   protected long getOperationCount() {
     return getConf().getLong(KEY_OPERATION_COUNT, DEFAULT_OPERATION_COUNT);
   }
+
+  /**
+   * Describe a test in the logs
+   * @param text text to print
+   * @param args arguments to format in the printing
+   */
+  protected void describe(String text, Object... args) {
+    LOG.info("\n\n{}: {}\n",
+        methodName.getMethodName(),
+        String.format(text, args));
+  }
+
+  /**
+   * Get the input stream statistics of an input stream.
+   * Raises an exception if the inner stream is not an S3A input stream
+   * @param in wrapper
+   * @return the statistics for the inner stream
+   */
+  protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics(
+      FSDataInputStream in) {
+    InputStream inner = in.getWrappedStream();
+    if (inner instanceof S3AInputStream) {
+      S3AInputStream s3a = (S3AInputStream) inner;
+      return s3a.getS3AStreamStatistics();
+    } else {
+      Assert.fail("Not an S3AInputStream: " + inner);
+      // never reached
+      return null;
+    }
+  }
+
+  /**
+   * Make times more readable, by adding a "," every three digits.
+   * @param nanos nanos or other large number
+   * @return a string for logging
+   */
+  protected static String toHuman(long nanos) {
+    return String.format(Locale.ENGLISH, "%,d", nanos);
+  }
+
+  /**
+   * Log the bandwidth of a timer as inferred from the number of
+   * bytes processed.
+   * @param timer timer
+   * @param bytes bytes processed in the time period
+   */
+  protected void bandwidth(NanoTimer timer, long bytes) {
+    LOG.info("Bandwidth = {}  MB/S",
+        timer.bandwidthDescription(bytes));
+  }
+
+  /**
+   * Work out the bandwidth in MB/s
+   * @param bytes bytes
+   * @param durationNS duration in nanos
+   * @return the number of megabytes/second of the recorded operation
+   */
+  public static double bandwidthMBs(long bytes, long durationNS) {
+    return (bytes * 1000.0 ) / durationNS;
+  }
+
+  /**
+   * A simple class for timing operations in nanoseconds, and for
+   * printing some useful results in the process.
+   */
+  protected static class NanoTimer {
+    final long startTime;
+    long endTime;
+
+    public NanoTimer() {
+      startTime = now();
+    }
+
+    /**
+     * End the operation
+     * @return the duration of the operation
+     */
+    public long end() {
+      endTime = now();
+      return duration();
+    }
+
+    /**
+     * End the operation; log the duration
+     * @param format message
+     * @param args any arguments
+     * @return the duration of the operation
+     */
+    public long end(String format, Object... args) {
+      long d = end();
+      LOG.info("Duration of {}: {} nS",
+          String.format(format, args), toHuman(d));
+      return d;
+    }
+
+    long now() {
+      return System.nanoTime();
+    }
+
+    long duration() {
+      return endTime - startTime;
+    }
+
+    double bandwidth(long bytes) {
+      return S3AScaleTestBase.bandwidthMBs(bytes, duration());
+    }
+
+    /**
+     * Bandwidth as bytes per second
+     * @param bytes bytes in
+     * @return the number of bytes per second this operation timed.
+     */
+    double bandwidthBytes(long bytes) {
+      return (bytes * 1.0 ) / duration();
+    }
+
+    /**
+     * How many nanoseconds per byte
+     * @param bytes bytes processed in this time period
+     * @return the nanoseconds it took each byte to be processed
+     */
+    long nanosPerByte(long bytes) {
+      return duration() / bytes;
+    }
+
+    /**
+     * Get a description of the bandwidth, even down to fractions of
+     * a MB
+     * @param bytes bytes processed
+     * @return bandwidth
+     */
+    String bandwidthDescription(long bytes) {
+      return String.format("%,.6f", bandwidth(bytes));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
index 2930e96..af1883e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
@@ -40,7 +40,6 @@ public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestS3ADeleteManyFiles.class);
 
-
   @Rule
   public Timeout testTimeout = new Timeout(30 * 60 * 1000);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
new file mode 100644
index 0000000..0c8b273
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Look at the performance of S3a operations
+ */
+public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestS3AInputStreamPerformance.class);
+
+  private S3AFileSystem s3aFS;
+  private Path testData;
+  private S3AFileStatus testDataStatus;
+  private FSDataInputStream in;
+  private S3AInstrumentation.InputStreamStatistics streamStatistics;
+  public static final int BLOCK_SIZE = 32 * 1024;
+  public static final int BIG_BLOCK_SIZE = 256 * 1024;
+
+  /** Tests only run if the there is a named test file that can be read */
+  private boolean testDataAvailable = true;
+  private String assumptionMessage = "test file";
+
+  /**
+   * Open the FS and the test data. The input stream is always set up here.
+   * @throws IOException
+   */
+  @Before
+  public void openFS() throws IOException {
+    Configuration conf = getConf();
+    String testFile =  conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
+    if (testFile.isEmpty()) {
+      assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE;
+      testDataAvailable = false;
+    } else {
+      testData = new Path(testFile);
+      s3aFS = (S3AFileSystem) FileSystem.newInstance(testData.toUri(), conf);
+      try {
+        testDataStatus = s3aFS.getFileStatus(testData);
+      } catch (IOException e) {
+        LOG.warn("Failed to read file {} specified in {}",
+            testFile, KEY_CSVTEST_FILE, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Cleanup: close the stream, close the FS.
+   */
+  @After
+  public void cleanup() {
+    IOUtils.closeStream(in);
+    IOUtils.closeStream(s3aFS);
+  }
+
+  /**
+   * Declare that the test requires the CSV test dataset
+   */
+  private void requireCSVTestData() {
+    Assume.assumeTrue(assumptionMessage, testDataAvailable);
+  }
+
+  /**
+   * Open the test file with the read buffer specified in the setting
+   * {@link #KEY_READ_BUFFER_SIZE}
+   * @return the stream, wrapping an S3a one
+   * @throws IOException
+   */
+  FSDataInputStream openTestFile() throws IOException {
+    int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
+        DEFAULT_READ_BUFFER_SIZE);
+    FSDataInputStream stream = s3aFS.open(testData, bufferSize);
+    streamStatistics = getInputStreamStatistics(stream);
+    return stream;
+  }
+
+  /**
+   * assert tha the stream was only ever opened once
+   */
+  protected void assertStreamOpenedExactlyOnce() {
+    assertOpenOperationCount(1);
+  }
+
+  /**
+   * Make an assertion count about the number of open operations
+   * @param expected the expected number
+   */
+  private void assertOpenOperationCount(int expected) {
+    assertEquals("open operations in " + streamStatistics,
+        expected, streamStatistics.openOperations);
+  }
+
+  /**
+   * Log how long an IOP took, by dividing the total time by the
+   * count of operations, printing in a human-readable form
+   * @param timer timing data
+   * @param count IOP count.
+   */
+  protected void logTimePerIOP(NanoTimer timer, long count) {
+    LOG.info("Time per IOP: {} nS", toHuman(timer.duration() / count));
+  }
+
+  @Test
+  public void testTimeToOpenAndReadWholeFileByByte() throws Throwable {
+    requireCSVTestData();
+    describe("Open the test file %s and read it byte by byte", testData);
+    long len = testDataStatus.getLen();
+    NanoTimer timeOpen = new NanoTimer();
+    in = openTestFile();
+    timeOpen.end("Open stream");
+    NanoTimer readTimer = new NanoTimer();
+    long count = 0;
+    while (in.read() >= 0) {
+      count ++;
+    }
+    readTimer.end("Time to read %d bytes", len);
+    bandwidth(readTimer, count);
+    assertEquals("Not enough bytes were read)", len, count);
+    long nanosPerByte = readTimer.nanosPerByte(count);
+    LOG.info("An open() call has the equivalent duration of reading {} bytes",
+        toHuman( timeOpen.duration() / nanosPerByte));
+  }
+
+  @Test
+  public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
+    requireCSVTestData();
+    describe("Open the test file %s and read it in blocks of size %d",
+        testData, BLOCK_SIZE);
+    long len = testDataStatus.getLen();
+    in = openTestFile();
+    byte[] block = new byte[BLOCK_SIZE];
+    NanoTimer timer2 = new NanoTimer();
+    long count = 0;
+    // implicitly rounding down here
+    long blockCount = len / BLOCK_SIZE;
+    for (long i = 0; i < blockCount; i++) {
+      int offset = 0;
+      int remaining = BLOCK_SIZE;
+      NanoTimer blockTimer = new NanoTimer();
+      int reads = 0;
+      while (remaining > 0) {
+        int bytesRead = in.read(block, offset, remaining);
+        reads ++;
+        if (bytesRead == 1) {
+          break;
+        }
+        remaining -= bytesRead;
+        offset += bytesRead;
+        count += bytesRead;
+      }
+      blockTimer.end("Reading block %d in %d reads", i, reads);
+    }
+    timer2.end("Time to read %d bytes in %d blocks", len, blockCount );
+    bandwidth(timer2, count);
+    LOG.info("{}", streamStatistics);
+  }
+
+  @Test
+  public void testLazySeekEnabled() throws Throwable {
+    requireCSVTestData();
+    describe("Verify that seeks do not trigger any IO");
+    long len = testDataStatus.getLen();
+    in = openTestFile();
+    NanoTimer timer = new NanoTimer();
+    long blockCount = len / BLOCK_SIZE;
+    for (long i = 0; i < blockCount; i++) {
+      in.seek(in.getPos() + BLOCK_SIZE - 1);
+    }
+    in.seek(0);
+    blockCount++;
+    timer.end("Time to execute %d seeks", blockCount);
+    logTimePerIOP(timer, blockCount);
+    LOG.info("{}", streamStatistics);
+    assertOpenOperationCount(0);
+    assertEquals("bytes read", 0, streamStatistics.bytesRead);
+  }
+
+  @Test
+  public void testReadAheadDefault() throws Throwable {
+    requireCSVTestData();
+    describe("Verify that a series of forward skips within the readahead" +
+        " range do not close and reopen the stream");
+    executeSeekReadSequence(BLOCK_SIZE, Constants.DEFAULT_READAHEAD_RANGE);
+    assertStreamOpenedExactlyOnce();
+  }
+
+  @Test
+  public void testReadaheadOutOfRange() throws Throwable {
+    requireCSVTestData();
+    try {
+      in = openTestFile();
+      in.setReadahead(-1L);
+      fail("Stream should have rejected the request "+ in);
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+  }
+
+  @Test
+  public void testReadBigBlocksAvailableReadahead() throws Throwable {
+    requireCSVTestData();
+    describe("set readahead to available bytes only");
+    executeSeekReadSequence(BIG_BLOCK_SIZE, 0);
+    // expect that the stream will have had lots of opens
+    assertTrue("not enough open operations in " + streamStatistics,
+        streamStatistics.openOperations > 1);
+  }
+
+  @Test
+  public void testReadBigBlocksBigReadahead() throws Throwable {
+    requireCSVTestData();
+    describe("Read big blocks with a big readahead");
+    executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2);
+    assertStreamOpenedExactlyOnce();
+  }
+
+  /**
+   * Execute a seek+read sequence
+   * @param blockSize block size for seeks
+   * @param readahead what the readahead value of the stream should be
+   * @throws IOException IO problems
+   */
+  protected void executeSeekReadSequence(long blockSize,
+      long readahead) throws IOException {
+    requireCSVTestData();
+    long len = testDataStatus.getLen();
+    in = openTestFile();
+    in.setReadahead(readahead);
+    NanoTimer timer = new NanoTimer();
+    long blockCount = len / blockSize;
+    LOG.info("Reading {} blocks, readahead = {}",
+        blockCount, readahead);
+    for (long i = 0; i < blockCount; i++) {
+      in.seek(in.getPos() + blockSize - 1);
+      // this is the read
+      assertTrue(in.read() >= 0);
+    }
+    timer.end("Time to execute %d seeks of distance %d with readahead = %d",
+        blockCount,
+        blockSize,
+        readahead);
+    logTimePerIOP(timer, blockCount);
+    LOG.info("Effective bandwidth {} MB/S",
+        timer.bandwidthDescription(streamStatistics.bytesRead -
+            streamStatistics.bytesSkippedOnSeek));
+    LOG.info("{}", streamStatistics);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27c4e90e/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties 
b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
index ced0687..bc85425 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
+++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
@@ -16,3 +16,6 @@ log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n
+
+# for debugging low level S3a operations, uncomment this line
+# log4j.logger.org.apache.hadoop.fs.s3a=DEBUG


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to