snvijaya commented on a change in pull request #2464:
URL: https://github.com/apache/hadoop/pull/2464#discussion_r540082701



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -224,6 +240,123 @@ private int readOneBlock(final byte[] b, final int off, 
final int len) throws IO
     return bytesToRead;
   }
 
+  private boolean shouldReadFully() {
+    return this.firstRead && this.context.readSmallFilesCompletely()
+        && this.contentLength <= this.bufferSize;
+  }
+
+  private boolean shouldReadLastBlock(int len) {
+    return this.firstRead && this.context.optimizeFooterRead()
+        && len == FOOTER_SIZE
+        && this.fCursor == this.contentLength - FOOTER_SIZE;
+  }
+
+  private int readFileCompletely(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+
+    Preconditions.checkNotNull(b);
+    LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+        off, len);
+
+    if (len == 0) {
+      return 0;
+    }
+
+    if (this.available() == 0) {
+      return -1;
+    }
+
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    buffer = new byte[bufferSize];
+    // data need to be copied to user buffer from index bCursor, bCursor has
+    // to be the current fCusor
+    bCursor = (int) fCursor;
+    fCursorAfterLastRead = fCursor;
+    int totalBytesRead = 0;
+    int loopCount = 0;
+    // Read from begining
+    fCursor = 0;
+    while (fCursor < contentLength) {
+      int bytesRead = readInternal(fCursor, buffer, limit,
+          (int) contentLength - limit, true);
+      if (bytesRead > 0) {
+        totalBytesRead += bytesRead;
+        limit += bytesRead;
+        fCursor += bytesRead;
+      }
+      if (loopCount++ >= 10) {

Review comment:
       As discussed, this logic needs to be relooked for 
   1. What if the requested data is already available in the partial read done ?
   2. Reduce the loopCount as the retry logic on ABFS driver can make the 
client read overall expensive. Fail faster with just 1 or 2 tries.
   3. Never fail the read request because optimization code failed to read full 
file. Fail fast and send read for client requested position.
   4. All pointer fields need to be in valid state (bCursor, fCursor, 
fCursorAfterLastRead). In failure case currently fCursor could be in a 
different position that last seek done.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -137,7 +142,13 @@ public synchronized int read(final byte[] b, final int 
off, final int len) throw
     }
     incrementReadOps();
     do {
-      lastReadBytes = readOneBlock(b, currentOff, currentLen);
+      if (shouldReadFully()) {

Review comment:
       validate logic should be grouped into a method and be called from here 
so that code duplication can be avoided in the 3 flows below. 
   

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -224,6 +240,123 @@ private int readOneBlock(final byte[] b, final int off, 
final int len) throws IO
     return bytesToRead;
   }
 
+  private boolean shouldReadFully() {
+    return this.firstRead && this.context.readSmallFilesCompletely()
+        && this.contentLength <= this.bufferSize;
+  }
+
+  private boolean shouldReadLastBlock(int len) {
+    return this.firstRead && this.context.optimizeFooterRead()
+        && len == FOOTER_SIZE
+        && this.fCursor == this.contentLength - FOOTER_SIZE;
+  }
+
+  private int readFileCompletely(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+
+    Preconditions.checkNotNull(b);
+    LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+        off, len);
+
+    if (len == 0) {
+      return 0;
+    }
+
+    if (this.available() == 0) {
+      return -1;
+    }
+
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    buffer = new byte[bufferSize];
+    // data need to be copied to user buffer from index bCursor, bCursor has
+    // to be the current fCusor
+    bCursor = (int) fCursor;
+    fCursorAfterLastRead = fCursor;
+    int totalBytesRead = 0;
+    int loopCount = 0;
+    // Read from begining
+    fCursor = 0;
+    while (fCursor < contentLength) {
+      int bytesRead = readInternal(fCursor, buffer, limit,
+          (int) contentLength - limit, true);
+      if (bytesRead > 0) {
+        totalBytesRead += bytesRead;
+        limit += bytesRead;
+        fCursor += bytesRead;
+      }
+      if (loopCount++ >= 10) {
+        throw new IOException(
+            "Too many attempts in reading whole file " + path);
+      }
+    }
+    firstRead = false;
+    if (totalBytesRead == -1) {
+      return -1;
+    }
+    return copyToUserBuffer(b, off, len);
+  }
+
+  private int readLastBlock(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+
+    Preconditions.checkNotNull(b);
+    LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+        off, len);
+
+    if (len == 0) {
+      return 0;
+    }
+
+    if (this.available() == 0) {
+      return -1;
+    }
+
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    buffer = new byte[bufferSize];
+    // data need to be copied to user buffer from index bCursor, for small
+    // files the bCursor will be contentlength - footer size,
+    // otherwise buffersize - footer size
+    bCursor = (int) (Math.min(contentLength, bufferSize) - FOOTER_SIZE);
+    // read API call is considered 1 single operation in reality server could
+    // return partial data and client has to retry untill the last full block
+    // is read. So setting the fCursorAfterLastRead before the possible
+    // multiple server calls
+    fCursorAfterLastRead = fCursor;
+    // 0 if contentlength is < buffersize
+    fCursor = Math.max(0, contentLength - bufferSize);
+    int totalBytesRead = 0;
+    int loopCount = 0;
+    while (fCursor < contentLength) {
+      int bytesRead = readInternal(fCursor, buffer, limit, bufferSize - limit,
+          true);
+      if (bytesRead > 0) {
+        totalBytesRead += bytesRead;
+        limit += bytesRead;
+        fCursor += bytesRead;
+      }
+      if (loopCount++ >= 10) {

Review comment:
       Comments added in readSmallFileCompletely will apply here too. 
   Also, failure state recovery might lead to throwing away the data that was 
retried till then. Using readAhead threads to read will help to save the 
partial data read and be of use for next read call.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to