[ 
https://issues.apache.org/jira/browse/HADOOP-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837561#comment-17837561
 ] 

ASF GitHub Bot commented on HADOOP-19139:
-----------------------------------------

anmolanmol1234 commented on code in PR #6699:
URL: https://github.com/apache/hadoop/pull/6699#discussion_r1566774019


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -128,6 +133,15 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
   /** ABFS instance to be held by the input stream to avoid GC close. */
   private final BackReference fsBackRef;
 
+  private AtomicBoolean fileStatusInformationPresent;
+
+  /**
+   * Defines if the inputStream has been used successfully once. Prefetches 
would
+   * start only after the first successful read.
+   */
+  private volatile boolean successfulUsage = false;
+  private final boolean pretechTriggerOnFirstRead;

Review Comment:
   typo:prefetch



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -309,5 +309,19 @@ public static String accountProperty(String property, 
String account) {
    * @see FileSystem#openFile(org.apache.hadoop.fs.Path)
    */
   public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = 
"fs.azure.buffered.pread.disable";
+
+  /**
+   * Disable the call of HEAD call for opening a inputStream. ReadPath API of 
server

Review Comment:
   Disable the call of HEAD call can be reworded to Remove additional HEAD call



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -226,7 +268,15 @@ public int read() throws IOException {
   }
 
   @Override
-  public synchronized int read(final byte[] b, final int off, final int len) 
throws IOException {
+  public synchronized int read(final byte[] b, final int off, final int len)
+      throws IOException {
+    int result = synchronizedRead(b, off, len);
+    successfulUsage =  true;
+    return result;
+  }
+
+  private int synchronizedRead(final byte[] b, final int off, final int len)

Review Comment:
   why add this new method, simply add a variable in the existing read method



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, 
final int len)
     // data need to be copied to user buffer from index bCursor,
     // AbfsInutStream buffer is going to contain data from last block start. In
     // that case bCursor will be set to fCursor - lastBlockStart
-    long lastBlockStart = max(0, contentLength - footerReadSize);
+    if (!fileStatusInformationPresent.get()) {
+      long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+      bCursor = (int) (fCursor - lastBlockStart);
+      return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, 
footerReadSize), true);
+    }
+    long lastBlockStart = max(0, getContentLength() - footerReadSize);
     bCursor = (int) (fCursor - lastBlockStart);
     // 0 if contentlength is < buffersize
-    long actualLenToRead = min(footerReadSize, contentLength);
-    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+    long actualLenToRead = min(footerReadSize, getContentLength());
+    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
   }
 
   private int optimisedRead(final byte[] b, final int off, final int len,
-      final long readFrom, final long actualLen) throws IOException {
+      final long readFrom, final long actualLen,
+      final boolean isReadWithoutContentLengthInformation) throws IOException {
     fCursor = readFrom;
     int totalBytesRead = 0;
     int lastBytesRead = 0;
     try {
       buffer = new byte[bufferSize];
+      boolean fileStatusInformationPresentBeforeRead = 
fileStatusInformationPresent.get();
       for (int i = 0;
-           i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+           i < MAX_OPTIMIZED_READ_ATTEMPTS && 
(!fileStatusInformationPresent.get()
+               || fCursor < getContentLength()); i++) {
         lastBytesRead = readInternal(fCursor, buffer, limit,
             (int) actualLen - limit, true);
         if (lastBytesRead > 0) {
           totalBytesRead += lastBytesRead;
+          boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+              && totalBytesRead == (int) actualLen;
           limit += lastBytesRead;
           fCursor += lastBytesRead;
           fCursorAfterLastRead = fCursor;
+          if (shouldBreak) {
+            break;
+          }
         }
       }
     } catch (IOException e) {
+      if (isNonRetriableOptimizedReadException(e)) {
+        throw e;

Review Comment:
   Yes this is not clear why these exceptions should not be retried ?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1099,7 +1099,9 @@ public AbfsRestOperation read(final String path,
     AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
         String.format("bytes=%d-%d", position, position + bufferLength - 1));
     requestHeaders.add(rangeHeader);
-    requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+    if (eTag == null || !eTag.isEmpty()) {

Review Comment:
   Shouldn't the condition be eTag != null ?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -145,6 +159,10 @@ public AbfsInputStream(
     this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
+    this.fileStatusInformationPresent = new 
AtomicBoolean(StringUtils.isNotEmpty(eTag));
+    this.pretechTriggerOnFirstRead =

Review Comment:
   typo : prefetch



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -306,7 +366,7 @@ private int readOneBlock(final byte[] b, final int off, 
final int len) throws IO
     //If buffer is empty, then fill the buffer.
     if (bCursor == limit) {
       //If EOF, then return -1
-      if (fCursor >= contentLength) {
+      if (fileStatusInformationPresent.get() && fCursor >= getContentLength()) 
{

Review Comment:
   We can have a case where bcursor is already equal to the limit and we have 
not made a read yet, so the value of fileStatusInformationPresent is false.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset, 
int length)
         throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
       }
     }
+
+    /*
+     * When the inputStream is started, if the application tries to parallelly 
read
+     * ont he inputStream, the first read will be synchronized and the 
subsequent

Review Comment:
   typo: on the



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, 
final int len)
     // data need to be copied to user buffer from index bCursor,
     // AbfsInutStream buffer is going to contain data from last block start. In
     // that case bCursor will be set to fCursor - lastBlockStart
-    long lastBlockStart = max(0, contentLength - footerReadSize);
+    if (!fileStatusInformationPresent.get()) {
+      long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+      bCursor = (int) (fCursor - lastBlockStart);
+      return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, 
footerReadSize), true);
+    }
+    long lastBlockStart = max(0, getContentLength() - footerReadSize);
     bCursor = (int) (fCursor - lastBlockStart);
     // 0 if contentlength is < buffersize
-    long actualLenToRead = min(footerReadSize, contentLength);
-    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+    long actualLenToRead = min(footerReadSize, getContentLength());
+    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
   }
 
   private int optimisedRead(final byte[] b, final int off, final int len,
-      final long readFrom, final long actualLen) throws IOException {
+      final long readFrom, final long actualLen,
+      final boolean isReadWithoutContentLengthInformation) throws IOException {
     fCursor = readFrom;
     int totalBytesRead = 0;
     int lastBytesRead = 0;
     try {
       buffer = new byte[bufferSize];
+      boolean fileStatusInformationPresentBeforeRead = 
fileStatusInformationPresent.get();
       for (int i = 0;
-           i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+           i < MAX_OPTIMIZED_READ_ATTEMPTS && 
(!fileStatusInformationPresent.get()
+               || fCursor < getContentLength()); i++) {
         lastBytesRead = readInternal(fCursor, buffer, limit,
             (int) actualLen - limit, true);
         if (lastBytesRead > 0) {
           totalBytesRead += lastBytesRead;
+          boolean shouldBreak = !fileStatusInformationPresentBeforeRead

Review Comment:
   How is this limited to the case when file status is not available ?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -817,8 +817,8 @@ public AbfsInputStream openFileForRead(Path path,
       FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
           .orElse(null);
       String relativePath = getRelativePath(path);
-      String resourceType, eTag;
-      long contentLength;
+      String resourceType = null, eTag = null;
+      long contentLength = -1;
       ContextEncryptionAdapter contextEncryptionAdapter = 
NoContextEncryptionAdapter.getInstance();
       /*
       * GetPathStatus API has to be called in case of:

Review Comment:
   This comment should be updated



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset, 
int length)
         throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
       }
     }
+
+    /*
+     * When the inputStream is started, if the application tries to parallelly 
read
+     * ont he inputStream, the first read will be synchronized and the 
subsequent
+     * reads will be non-synchronized.
+     */
+    if (!successfulUsage) {

Review Comment:
   the variable successful usage is not clear should be something like 
secondRead



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, 
final int len)
     // data need to be copied to user buffer from index bCursor,
     // AbfsInutStream buffer is going to contain data from last block start. In
     // that case bCursor will be set to fCursor - lastBlockStart
-    long lastBlockStart = max(0, contentLength - footerReadSize);
+    if (!fileStatusInformationPresent.get()) {
+      long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+      bCursor = (int) (fCursor - lastBlockStart);
+      return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, 
footerReadSize), true);
+    }
+    long lastBlockStart = max(0, getContentLength() - footerReadSize);
     bCursor = (int) (fCursor - lastBlockStart);
     // 0 if contentlength is < buffersize
-    long actualLenToRead = min(footerReadSize, contentLength);
-    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+    long actualLenToRead = min(footerReadSize, getContentLength());
+    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
   }
 
   private int optimisedRead(final byte[] b, final int off, final int len,
-      final long readFrom, final long actualLen) throws IOException {
+      final long readFrom, final long actualLen,
+      final boolean isReadWithoutContentLengthInformation) throws IOException {

Review Comment:
   Variable should not be opposite of what is expected I mean, should be 
isContentLengthInfoAvailable. Without keyword is causing confusion



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -564,11 +669,25 @@ int readRemote(long position, byte[] b, int offset, int 
length, TracingContext t
     } catch (AzureBlobFileSystemException ex) {
       if (ex instanceof AbfsRestOperationException) {
         AbfsRestOperationException ere = (AbfsRestOperationException) ex;
+        abfsHttpOperation = ((AbfsRestOperationException) 
ex).getAbfsHttpOperation();
         if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
           throw new FileNotFoundException(ere.getMessage());
         }
+        /*
+         * Status 416 is sent when read range is out of contentLength range.
+         * This would happen only in the case if contentLength is not known 
before
+         * opening the inputStream.
+         */
+        if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE
+            && !fileStatusInformationPresent.get()) {
+          return -1;
+        }
       }
       throw new IOException(ex);
+    } finally {
+      if (!fileStatusInformationPresent.get() && abfsHttpOperation != null) {
+        initPathPropertiesFromReadPathResponseHeader(abfsHttpOperation);

Review Comment:
   method name can be shortened



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -447,8 +537,7 @@ private boolean validate(final byte[] b, final int off, 
final int len)
     Preconditions.checkNotNull(b);
     LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
         off, len);
-
-    if (this.available() == 0) {
+    if (fileStatusInformationPresent.get() && this.available() == 0) {

Review Comment:
   Since we are already checking file status info is available here the if 
check ( !fileStatusInformationPresent.get()) in available() method will never 
get hit. 





> [ABFS]: No GetPathStatus call for opening AbfsInputStream
> ---------------------------------------------------------
>
>                 Key: HADOOP-19139
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19139
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>            Reporter: Pranav Saxena
>            Assignee: Pranav Saxena
>            Priority: Major
>              Labels: pull-request-available
>
> Read API gives contentLen and etag of the path. This information would be 
> used in future calls on that inputStream. Prior information of eTag is of not 
> much importance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to