[ https://issues.apache.org/jira/browse/HADOOP-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837561#comment-17837561 ]
ASF GitHub Bot commented on HADOOP-19139: ----------------------------------------- anmolanmol1234 commented on code in PR #6699: URL: https://github.com/apache/hadoop/pull/6699#discussion_r1566774019 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -128,6 +133,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, /** ABFS instance to be held by the input stream to avoid GC close. */ private final BackReference fsBackRef; + private AtomicBoolean fileStatusInformationPresent; + + /** + * Defines if the inputStream has been used successfully once. Prefetches would + * start only after the first successful read. + */ + private volatile boolean successfulUsage = false; + private final boolean pretechTriggerOnFirstRead; Review Comment: typo:prefetch ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java: ########## @@ -309,5 +309,19 @@ public static String accountProperty(String property, String account) { * @see FileSystem#openFile(org.apache.hadoop.fs.Path) */ public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; + + /** + * Disable the call of HEAD call for opening a inputStream. ReadPath API of server Review Comment: Disable the call of HEAD call can be reworded to Remove additional HEAD call ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -226,7 +268,15 @@ public int read() throws IOException { } @Override - public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + public synchronized int read(final byte[] b, final int off, final int len) + throws IOException { + int result = synchronizedRead(b, off, len); + successfulUsage = true; + return result; + } + + private int synchronizedRead(final byte[] b, final int off, final int len) Review Comment: why add this new method, simply add a variable in the existing read method ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, // AbfsInutStream buffer is going to contain data from last block start. In // that case bCursor will be set to fCursor - lastBlockStart - long lastBlockStart = max(0, contentLength - footerReadSize); + if (!fileStatusInformationPresent.get()) { + long lastBlockStart = max(0, (fCursor + len) - footerReadSize); + bCursor = (int) (fCursor - lastBlockStart); + return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, footerReadSize), true); + } + long lastBlockStart = max(0, getContentLength() - footerReadSize); bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize - long actualLenToRead = min(footerReadSize, contentLength); - return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); + long actualLenToRead = min(footerReadSize, getContentLength()); + return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false); } private int optimisedRead(final byte[] b, final int off, final int len, - final long readFrom, final long actualLen) throws IOException { + final long readFrom, final long actualLen, + final boolean isReadWithoutContentLengthInformation) throws IOException { fCursor = readFrom; int totalBytesRead = 0; int lastBytesRead = 0; try { buffer = new byte[bufferSize]; + boolean fileStatusInformationPresentBeforeRead = fileStatusInformationPresent.get(); for (int i = 0; - i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) { + i < MAX_OPTIMIZED_READ_ATTEMPTS && (!fileStatusInformationPresent.get() + || fCursor < getContentLength()); i++) { lastBytesRead = readInternal(fCursor, buffer, limit, (int) actualLen - limit, true); if (lastBytesRead > 0) { totalBytesRead += lastBytesRead; + boolean shouldBreak = !fileStatusInformationPresentBeforeRead + && totalBytesRead == (int) actualLen; limit += lastBytesRead; fCursor += lastBytesRead; fCursorAfterLastRead = fCursor; + if (shouldBreak) { + break; + } } } } catch (IOException e) { + if (isNonRetriableOptimizedReadException(e)) { + throw e; Review Comment: Yes this is not clear why these exceptions should not be retried ? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java: ########## @@ -1099,7 +1099,9 @@ public AbfsRestOperation read(final String path, AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, String.format("bytes=%d-%d", position, position + bufferLength - 1)); requestHeaders.add(rangeHeader); - requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + if (eTag == null || !eTag.isEmpty()) { Review Comment: Shouldn't the condition be eTag != null ? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -145,6 +159,10 @@ public AbfsInputStream( this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; + this.fileStatusInformationPresent = new AtomicBoolean(StringUtils.isNotEmpty(eTag)); + this.pretechTriggerOnFirstRead = Review Comment: typo : prefetch ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -306,7 +366,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO //If buffer is empty, then fill the buffer. if (bCursor == limit) { //If EOF, then return -1 - if (fCursor >= contentLength) { + if (fileStatusInformationPresent.get() && fCursor >= getContentLength()) { Review Comment: We can have a case where bcursor is already equal to the limit and we have not made a read yet, so the value of fileStatusInformationPresent is false. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset, int length) throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } } + + /* + * When the inputStream is started, if the application tries to parallelly read + * ont he inputStream, the first read will be synchronized and the subsequent Review Comment: typo: on the ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, // AbfsInutStream buffer is going to contain data from last block start. In // that case bCursor will be set to fCursor - lastBlockStart - long lastBlockStart = max(0, contentLength - footerReadSize); + if (!fileStatusInformationPresent.get()) { + long lastBlockStart = max(0, (fCursor + len) - footerReadSize); + bCursor = (int) (fCursor - lastBlockStart); + return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, footerReadSize), true); + } + long lastBlockStart = max(0, getContentLength() - footerReadSize); bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize - long actualLenToRead = min(footerReadSize, contentLength); - return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); + long actualLenToRead = min(footerReadSize, getContentLength()); + return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false); } private int optimisedRead(final byte[] b, final int off, final int len, - final long readFrom, final long actualLen) throws IOException { + final long readFrom, final long actualLen, + final boolean isReadWithoutContentLengthInformation) throws IOException { fCursor = readFrom; int totalBytesRead = 0; int lastBytesRead = 0; try { buffer = new byte[bufferSize]; + boolean fileStatusInformationPresentBeforeRead = fileStatusInformationPresent.get(); for (int i = 0; - i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) { + i < MAX_OPTIMIZED_READ_ATTEMPTS && (!fileStatusInformationPresent.get() + || fCursor < getContentLength()); i++) { lastBytesRead = readInternal(fCursor, buffer, limit, (int) actualLen - limit, true); if (lastBytesRead > 0) { totalBytesRead += lastBytesRead; + boolean shouldBreak = !fileStatusInformationPresentBeforeRead Review Comment: How is this limited to the case when file status is not available ? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java: ########## @@ -817,8 +817,8 @@ public AbfsInputStream openFileForRead(Path path, FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus) .orElse(null); String relativePath = getRelativePath(path); - String resourceType, eTag; - long contentLength; + String resourceType = null, eTag = null; + long contentLength = -1; ContextEncryptionAdapter contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance(); /* * GetPathStatus API has to be called in case of: Review Comment: This comment should be updated ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset, int length) throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } } + + /* + * When the inputStream is started, if the application tries to parallelly read + * ont he inputStream, the first read will be synchronized and the subsequent + * reads will be non-synchronized. + */ + if (!successfulUsage) { Review Comment: the variable successful usage is not clear should be something like secondRead ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, // AbfsInutStream buffer is going to contain data from last block start. In // that case bCursor will be set to fCursor - lastBlockStart - long lastBlockStart = max(0, contentLength - footerReadSize); + if (!fileStatusInformationPresent.get()) { + long lastBlockStart = max(0, (fCursor + len) - footerReadSize); + bCursor = (int) (fCursor - lastBlockStart); + return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, footerReadSize), true); + } + long lastBlockStart = max(0, getContentLength() - footerReadSize); bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize - long actualLenToRead = min(footerReadSize, contentLength); - return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); + long actualLenToRead = min(footerReadSize, getContentLength()); + return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false); } private int optimisedRead(final byte[] b, final int off, final int len, - final long readFrom, final long actualLen) throws IOException { + final long readFrom, final long actualLen, + final boolean isReadWithoutContentLengthInformation) throws IOException { Review Comment: Variable should not be opposite of what is expected I mean, should be isContentLengthInfoAvailable. Without keyword is causing confusion ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -564,11 +669,25 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { AbfsRestOperationException ere = (AbfsRestOperationException) ex; + abfsHttpOperation = ((AbfsRestOperationException) ex).getAbfsHttpOperation(); if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new FileNotFoundException(ere.getMessage()); } + /* + * Status 416 is sent when read range is out of contentLength range. + * This would happen only in the case if contentLength is not known before + * opening the inputStream. + */ + if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE + && !fileStatusInformationPresent.get()) { + return -1; + } } throw new IOException(ex); + } finally { + if (!fileStatusInformationPresent.get() && abfsHttpOperation != null) { + initPathPropertiesFromReadPathResponseHeader(abfsHttpOperation); Review Comment: method name can be shortened ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -447,8 +537,7 @@ private boolean validate(final byte[] b, final int off, final int len) Preconditions.checkNotNull(b); LOG.debug("read one block requested b.length = {} off {} len {}", b.length, off, len); - - if (this.available() == 0) { + if (fileStatusInformationPresent.get() && this.available() == 0) { Review Comment: Since we are already checking file status info is available here the if check ( !fileStatusInformationPresent.get()) in available() method will never get hit. > [ABFS]: No GetPathStatus call for opening AbfsInputStream > --------------------------------------------------------- > > Key: HADOOP-19139 > URL: https://issues.apache.org/jira/browse/HADOOP-19139 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Reporter: Pranav Saxena > Assignee: Pranav Saxena > Priority: Major > Labels: pull-request-available > > Read API gives contentLen and etag of the path. This information would be > used in future calls on that inputStream. Prior information of eTag is of not > much importance. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org