[ https://issues.apache.org/jira/browse/HADOOP-18521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17639972#comment-17639972 ]
ASF GitHub Bot commented on HADOOP-18521: ----------------------------------------- anmolanmol1234 commented on code in PR #5117: URL: https://github.com/apache/hadoop/pull/5117#discussion_r1033420242 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -555,7 +579,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t throw new FileNotFoundException(ere.getMessage()); } } - throw new IOException(ex); + throw ex; Review Comment: Any specific reason for changing the exception type from IOException to AzureBlobFileSystemException ? ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -851,4 +880,67 @@ private void resetReadBufferManager(int bufferSize, int threshold) { // by successive tests can lead to OOM based on the dev VM/machine capacity. System.gc(); } + + /** + * The first readahead closes the stream. + */ + @Test + public void testStreamCloseInFirstReadAhead() throws Exception { + describe("close a stream during prefetch, verify outcome is good"); + + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + AbfsInputStream inputStream = getAbfsInputStream(client, getMethodName()); + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + + final long initialInProgressBlocksDiscarded = bufferManager.getInProgressBlocksDiscarded(); + + // on first read, the op succeeds but the stream is closed, which + // means that the request should be considered a failure + doAnswer(invocation -> { + LOG.info("in read call with {}", inputStream); + inputStream.close(); + return successOp; + }).doReturn(successOp) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + // kick these off before the read() to avoid various race conditions. + queueReadAheads(inputStream); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for a sec to + // get the read ahead threads to complete + waitForPrefetchCompletion(); + + // this triggers prefetching, which closes the stream while the read + // is queued. which causes the prefetch to not return. + // which triggers a blocking read, which will then fail. + intercept(PathIOException.class, FSExceptionMessages.STREAM_IS_CLOSED, () -> { + // should fail + int bytesRead = inputStream.read(new byte[ONE_KB]); + // diagnostics info if failure wasn't raised + return "read " + bytesRead + " bytes from " + inputStream; + }); + + Assertions.assertThat(bufferManager.getCompletedReadListCopy()) + .filteredOn(rb -> rb.getStream() == inputStream) + .describedAs("list of completed reads") + .isEmpty(); + IOStatisticsStore ios = inputStream.getIOStatistics(); + assertThatStatisticCounter(ios, STREAM_READ_PREFETCH_BLOCKS_DISCARDED) + .describedAs("blocks discarded by %s", inputStream) + .isGreaterThan(0); + + // at least one of the blocks was discarded in progress. + // this is guaranteed because the mockito callback must have been invoked + // by the prefetcher + Assertions.assertThat(bufferManager.getInProgressBlocksDiscarded()) + .describedAs("in progress blocks discarded") + .isGreaterThan(initialInProgressBlocksDiscarded); + } + } Review Comment: nit: Add line at the end of the file. ########## hadoop-tools/hadoop-azure/src/test/resources/log4j.properties: ########## @@ -26,6 +26,8 @@ log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG +log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferManager=TRACE Review Comment: Was this added for testing as this might add a lot of logging ? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java: ########## @@ -162,4 +182,120 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) { this.isAnyByteConsumed = isAnyByteConsumed; } + @Override + public String toString() { + return super.toString() + + "{ status=" + status + + ", offset=" + offset + + ", length=" + length + + ", requestedLength=" + requestedLength + + ", bufferindex=" + bufferindex + + ", timeStamp=" + timeStamp + + ", isFirstByteConsumed=" + isFirstByteConsumed + + ", isLastByteConsumed=" + isLastByteConsumed + + ", isAnyByteConsumed=" + isAnyByteConsumed + + ", errException=" + errException + + ", stream=" + (stream != null ? stream.getStreamID() : "none") + + ", stream closed=" + isStreamClosed() + + ", latch=" + latch + + '}'; + } + + /** + * Is the stream closed. + * @return stream closed status. + */ + public boolean isStreamClosed() { + return stream != null && stream.isClosed(); + } + + /** + * IOStatistics of stream. + * @return the stream's IOStatisticsStore. + */ + public IOStatisticsStore getStreamIOStatistics() { + return stream.getIOStatistics(); + } + + /** + * Start using the buffer. + * Sets the byte consumption flags as appriopriate, then + * updates the stream statistics with the use of this buffer. + * @param offset offset in buffer where copy began + * @param bytesCopied bytes copied. + */ + void dataConsumedByStream(int offset, int bytesCopied) { + boolean isFirstUse = !isAnyByteConsumed; + setAnyByteConsumed(true); + if (offset == 0) { + setFirstByteConsumed(true); + } + if (offset + bytesCopied == getLength()) { + setLastByteConsumed(true); + } + IOStatisticsStore iostats = getStreamIOStatistics(); + if (isFirstUse) { + // first use, update the use + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_USED, 1); + } + // every use, update the count of bytes read + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_USED, bytesCopied); + } + + /** + * The (completed) buffer was evicted; update stream statistics + * as appropriate. + */ + void evicted() { + IOStatisticsStore iostats = getStreamIOStatistics(); + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED, 1); + if (!isAnyByteConsumed()) { + // nothing was read, so consider it discarded. + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1); + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, getLength()); + } + } + + /** + * The (completed) buffer was discarded; no data was read. + */ + void discarded() { + if (getBufferindex() >= 0) { + IOStatisticsStore iostats = getStreamIOStatistics(); + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1); + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, getLength()); + } + } + + /** + * Release the buffer: update fields as appropriate. + */ + void releaseBuffer() { + setBuffer(null); + setBufferindex(-1); + } + + Review Comment: nit: Extra line > ABFS ReadBufferManager buffer sharing across concurrent HTTP requests > --------------------------------------------------------------------- > > Key: HADOOP-18521 > URL: https://issues.apache.org/jira/browse/HADOOP-18521 > Project: Hadoop Common > Issue Type: Bug > Components: fs/azure > Affects Versions: 3.3.2, 3.3.3, 3.3.4 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Critical > Labels: pull-request-available > > AbfsInputStream.close() can trigger the return of buffers used for active > prefetch GET requests into the ReadBufferManager free buffer pool. > A subsequent prefetch by a different stream in the same process may acquire > this same buffer. This can lead to risk of corruption of its own prefetched > data, data which may then be returned to that other thread. > On releases without the fix for this (3.3.2+), the bug can be avoided by > disabling all prefetching > {code} > fs.azure.readaheadqueue.depth = 0 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org