HADOOP-12444 Support lazy seek in S3AInputStream. Rajesh Balamohan via stevel


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b9e3eff6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b9e3eff6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b9e3eff6

Branch: refs/heads/HDFS-1312
Commit: b9e3eff62a7415d8666656a75db69ff3e43f8e7e
Parents: fcb3fcd
Author: Steve Loughran <ste...@apache.org>
Authored: Sat Apr 9 11:24:39 2016 +0100
Committer: Steve Loughran <ste...@apache.org>
Committed: Sat Apr 9 11:25:31 2016 +0100

----------------------------------------------------------------------
 .../apache/hadoop/fs/s3a/S3AInputStream.java    | 231 ++++++++++++++-----
 1 file changed, 172 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9e3eff6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 750358d..42178a4 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
@@ -37,82 +36,128 @@ public class S3AInputStream extends FSInputStream {
   private long pos;
   private boolean closed;
   private S3ObjectInputStream wrappedStream;
-  private FileSystem.Statistics stats;
-  private AmazonS3Client client;
-  private String bucket;
-  private String key;
-  private long contentLength;
+  private final FileSystem.Statistics stats;
+  private final AmazonS3Client client;
+  private final String bucket;
+  private final String key;
+  private final long contentLength;
+  private final String uri;
   public static final Logger LOG = S3AFileSystem.LOG;
   public static final long CLOSE_THRESHOLD = 4096;
 
-  public S3AInputStream(String bucket, String key, long contentLength, 
AmazonS3Client client,
-                        FileSystem.Statistics stats) {
+  // Used by lazy seek
+  private long nextReadPos;
+
+  //Amount of data requested from the request
+  private long requestedStreamLen;
+
+  public S3AInputStream(String bucket, String key, long contentLength,
+      AmazonS3Client client, FileSystem.Statistics stats) {
     this.bucket = bucket;
     this.key = key;
     this.contentLength = contentLength;
     this.client = client;
     this.stats = stats;
     this.pos = 0;
+    this.nextReadPos = 0;
     this.closed = false;
     this.wrappedStream = null;
+    this.uri = "s3a://" + this.bucket + "/" + this.key;
   }
 
-  private void openIfNeeded() throws IOException {
-    if (wrappedStream == null) {
-      reopen(0);
-    }
-  }
-
-  private synchronized void reopen(long pos) throws IOException {
+  /**
+   * Opens up the stream at specified target position and for given length.
+   *
+   * @param targetPos target position
+   * @param length length requested
+   * @throws IOException
+   */
+  private synchronized void reopen(long targetPos, long length)
+      throws IOException {
+    requestedStreamLen = (length < 0) ? this.contentLength :
+        Math.max(this.contentLength, (CLOSE_THRESHOLD + (targetPos + length)));
 
     if (wrappedStream != null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Aborting old stream to open at pos " + pos);
+        LOG.debug("Closing the previous stream");
       }
-      wrappedStream.abort();
-    }
-
-    if (pos < 0) {
-      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
-          +" " + pos);
+      closeStream(requestedStreamLen);
     }
 
-    if (contentLength > 0 && pos > contentLength-1) {
-      throw new EOFException(
-          FSExceptionMessages.CANNOT_SEEK_PAST_EOF
-          + " " + pos);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Requesting for "
+          + "targetPos=" + targetPos
+          + ", length=" + length
+          + ", requestedStreamLen=" + requestedStreamLen
+          + ", streamPosition=" + pos
+          + ", nextReadPosition=" + nextReadPos
+      );
     }
 
-    LOG.debug("Actually opening file " + key + " at pos " + pos);
-
-    GetObjectRequest request = new GetObjectRequest(bucket, key);
-    request.setRange(pos, contentLength-1);
-
+    GetObjectRequest request = new GetObjectRequest(bucket, key)
+        .withRange(targetPos, requestedStreamLen);
     wrappedStream = client.getObject(request).getObjectContent();
 
     if (wrappedStream == null) {
       throw new IOException("Null IO stream");
     }
 
-    this.pos = pos;
+    this.pos = targetPos;
   }
 
   @Override
   public synchronized long getPos() throws IOException {
-    return pos;
+    return (nextReadPos < 0) ? 0 : nextReadPos;
   }
 
   @Override
-  public synchronized void seek(long pos) throws IOException {
+  public synchronized void seek(long targetPos) throws IOException {
     checkNotClosed();
 
-    if (this.pos == pos) {
+    // Do not allow negative seek
+    if (targetPos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+          + " " + targetPos);
+    }
+
+    if (this.contentLength <= 0) {
+      return;
+    }
+
+    // Lazy seek
+    nextReadPos = targetPos;
+  }
+
+  /**
+   * Adjust the stream to a specific position.
+   *
+   * @param targetPos target seek position
+   * @param length length of content that needs to be read from targetPos
+   * @throws IOException
+   */
+  private void seekInStream(long targetPos, long length) throws IOException {
+    checkNotClosed();
+    if (wrappedStream == null) {
       return;
     }
 
-    LOG.debug(
-        "Reopening " + this.key + " to seek to new offset " + (pos - 
this.pos));
-    reopen(pos);
+    // compute how much more to skip
+    long diff = targetPos - pos;
+    if (targetPos > pos) {
+      if ((diff + length) <= wrappedStream.available()) {
+        // already available in buffer
+        pos += wrappedStream.skip(diff);
+        if (pos != targetPos) {
+          throw new IOException("Failed to seek to " + targetPos
+              + ". Current position " + pos);
+        }
+        return;
+      }
+    }
+
+    // close the stream; if read the object will be opened at the new pos
+    closeStream(this.requestedStreamLen);
+    pos = targetPos;
   }
 
   @Override
@@ -120,22 +165,40 @@ public class S3AInputStream extends FSInputStream {
     return false;
   }
 
+  /**
+   * Perform lazy seek and adjust stream to correct position for reading.
+   *
+   * @param targetPos position from where data should be read
+   * @param len length of the content that needs to be read
+   */
+  private void lazySeek(long targetPos, long len) throws IOException {
+    //For lazy seek
+    if (targetPos != this.pos) {
+      seekInStream(targetPos, len);
+    }
+
+    //re-open at specific location if needed
+    if (wrappedStream == null) {
+      reopen(targetPos, len);
+    }
+  }
+
   @Override
   public synchronized int read() throws IOException {
     checkNotClosed();
+    if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
+      return -1;
+    }
 
-    openIfNeeded();
+    lazySeek(nextReadPos, 1);
 
     int byteRead;
     try {
       byteRead = wrappedStream.read();
-    } catch (SocketTimeoutException e) {
-      LOG.info("Got timeout while trying to read from stream, trying to 
recover " + e);
-      reopen(pos);
-      byteRead = wrappedStream.read();
-    } catch (SocketException e) {
-      LOG.info("Got socket exception while trying to read from stream, trying 
to recover " + e);
-      reopen(pos);
+    } catch (SocketTimeoutException | SocketException e) {
+      LOG.info("Got exception while trying to read from stream,"
+          + " trying to recover " + e);
+      reopen(pos, 1);
       byteRead = wrappedStream.read();
     } catch (EOFException e) {
       return -1;
@@ -143,6 +206,7 @@ public class S3AInputStream extends FSInputStream {
 
     if (byteRead >= 0) {
       pos++;
+      nextReadPos++;
     }
 
     if (stats != null && byteRead >= 0) {
@@ -152,26 +216,34 @@ public class S3AInputStream extends FSInputStream {
   }
 
   @Override
-  public synchronized int read(byte[] buf, int off, int len) throws 
IOException {
+  public synchronized int read(byte[] buf, int off, int len)
+      throws IOException {
     checkNotClosed();
 
-    openIfNeeded();
+    validatePositionedReadArgs(nextReadPos, buf, off, len);
+    if (len == 0) {
+      return 0;
+    }
+
+    if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
+      return -1;
+    }
+
+    lazySeek(nextReadPos, len);
 
     int byteRead;
     try {
       byteRead = wrappedStream.read(buf, off, len);
-    } catch (SocketTimeoutException e) {
-      LOG.info("Got timeout while trying to read from stream, trying to 
recover " + e);
-      reopen(pos);
-      byteRead = wrappedStream.read(buf, off, len);
-    } catch (SocketException e) {
-      LOG.info("Got socket exception while trying to read from stream, trying 
to recover " + e);
-      reopen(pos);
+    } catch (SocketTimeoutException | SocketException e) {
+      LOG.info("Got exception while trying to read from stream,"
+          + " trying to recover " + e);
+      reopen(pos, len);
       byteRead = wrappedStream.read(buf, off, len);
     }
 
     if (byteRead > 0) {
       pos += byteRead;
+      nextReadPos += byteRead;
     }
 
     if (stats != null && byteRead > 0) {
@@ -191,15 +263,43 @@ public class S3AInputStream extends FSInputStream {
   public synchronized void close() throws IOException {
     super.close();
     closed = true;
+    closeStream(this.contentLength);
+  }
+
+  /**
+   * Close a stream: decide whether to abort or close, based on
+   * the length of the stream and the current position.
+   *
+   * This does not set the {@link #closed} flag.
+   * @param length length of the stream.
+   * @throws IOException
+   */
+  private void closeStream(long length) throws IOException {
     if (wrappedStream != null) {
-      if (contentLength - pos <= CLOSE_THRESHOLD) {
-        // Close, rather than abort, so that the http connection can be reused.
-        wrappedStream.close();
-      } else {
+      String reason = null;
+      boolean shouldAbort = length - pos > CLOSE_THRESHOLD;
+      if (!shouldAbort) {
+        try {
+          reason = "Closed stream";
+          wrappedStream.close();
+        } catch (IOException e) {
+          // exception escalates to an abort
+          LOG.debug("When closing stream", e);
+          shouldAbort = true;
+        }
+      }
+      if (shouldAbort) {
         // Abort, rather than just close, the underlying stream.  Otherwise, 
the
         // remaining object payload is read from S3 while closing the stream.
         wrappedStream.abort();
+        reason = "Closed stream with abort";
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(reason + "; streamPos=" + pos
+            + ", nextReadPos=" + nextReadPos
+            + ", contentLength=" + length);
       }
+      wrappedStream = null;
     }
   }
 
@@ -219,6 +319,18 @@ public class S3AInputStream extends FSInputStream {
     return false;
   }
 
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "S3AInputStream{");
+    sb.append(uri);
+    sb.append(" pos=").append(pos);
+    sb.append(" nextReadPos=").append(nextReadPos);
+    sb.append(" contentLength=").append(contentLength);
+    sb.append('}');
+    return sb.toString();
+  }
+
   /**
    * Subclass {@code readFully()} operation which only seeks at the start
    * of the series of operations; seeking back at the end.
@@ -234,6 +346,7 @@ public class S3AInputStream extends FSInputStream {
   @Override
   public void readFully(long position, byte[] buffer, int offset, int length)
       throws IOException {
+    checkNotClosed();
     validatePositionedReadArgs(position, buffer, offset, length);
     if (length == 0) {
       return;

Reply via email to